File: sequence_mixin.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (388 lines) | stat: -rw-r--r-- 19,014 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# -*- coding: utf-8 -*-
from datetime import date

from odoo import api, fields, models, _
from odoo.exceptions import ValidationError
from odoo.tools.misc import format_date
from odoo.tools import frozendict, mute_logger, date_utils, SQL

import re
from collections import defaultdict
from psycopg2 import errors as pgerrors


class SequenceMixin(models.AbstractModel):
    """Mechanism used to have an editable sequence number.

    Be careful of how you use this regarding the prefixes. More info in the
    docstring of _get_last_sequence.
    """

    _name = 'sequence.mixin'
    _description = "Automatic sequence"

    _sequence_field = "name"
    _sequence_date_field = "date"
    _sequence_index = False

    prefix = r'(?P<prefix1>.*?)'
    prefix2 = r'(?P<prefix2>\D)'
    prefix3 = r'(?P<prefix3>\D+?)'
    seq = r'(?P<seq>\d*)'
    month = r'(?P<month>(0[1-9]|1[0-2]))'
    # `(19|20|21)` is for catching 19 20 and 21 century prefixes
    year = r'(?P<year>((?<=\D)|(?<=^))((19|20|21)\d{2}|(\d{2}(?=\D))))'
    year_end = r'(?P<year_end>((?<=\D)|(?<=^))((19|20|21)\d{2}|(\d{2}(?=\D))))'
    suffix = r'(?P<suffix>\D*?)'

    _sequence_year_range_monthly_regex = fr'^{prefix}{year}{prefix2}{year_end}(?P<prefix3>\D){month}(?P<prefix4>\D+?){seq}{suffix}$'
    _sequence_year_range_regex = fr'^(?:{prefix}{year}{prefix2}{year_end}{prefix3})?{seq}{suffix}$'
    _sequence_monthly_regex = fr'^{prefix}{year}(?P<prefix2>\D*?){month}{prefix3}{seq}{suffix}$'
    _sequence_yearly_regex = fr'^{prefix}(?P<year>((?<=\D)|(?<=^))((19|20|21)?\d{{2}}))(?P<prefix2>\D+?){seq}{suffix}$'
    _sequence_fixed_regex = fr'^{prefix}(?P<seq>\d{{0,9}}){suffix}$'

    sequence_prefix = fields.Char(compute='_compute_split_sequence', store=True)
    sequence_number = fields.Integer(compute='_compute_split_sequence', store=True)

    def init(self):
        # Add an index to optimise the query searching for the highest sequence number
        if not self._abstract and self._sequence_index:
            index_name = self._table + '_sequence_index'
            self.env.cr.execute(SQL('SELECT indexname FROM pg_indexes WHERE indexname = %s', index_name))
            if not self.env.cr.fetchone():
                self.env.cr.execute(SQL("""
                    CREATE INDEX %(index_name)s ON %(table)s (%(sequence_index)s, sequence_prefix desc, sequence_number desc, %(field)s);
                    CREATE INDEX %(index2_name)s ON %(table)s (%(sequence_index)s, id desc, sequence_prefix);
                    """,
                    sequence_index=SQL.identifier(self._sequence_index),
                    index_name=SQL.identifier(index_name),
                    index2_name=SQL.identifier(index_name + "2"),
                    table=SQL.identifier(self._table),
                    field=SQL.identifier(self._sequence_field),
                ))

    def _get_sequence_date_range(self, reset):
        ref_date = fields.Date.to_date(self[self._sequence_date_field])
        if reset in ('year', 'year_range', 'year_range_month'):
            return (date(ref_date.year, 1, 1), date(ref_date.year, 12, 31), None, None)
        if reset == 'month':
            return date_utils.get_month(ref_date) + (None, None)
        if reset == 'never':
            return (date(1, 1, 1), date(9999, 12, 31), None, None)
        raise NotImplementedError(reset)

    def _must_check_constrains_date_sequence(self):
        return True

    def _year_match(self, format_value, year):
        return format_value == self._truncate_year_to_length(year, len(str(format_value)))

    def _truncate_year_to_length(self, year, length):
        return year % (10 ** length)

    def _sequence_matches_date(self):
        self.ensure_one()
        date = fields.Date.to_date(self[self._sequence_date_field])
        sequence = self[self._sequence_field]

        if not sequence or not date:
            return True

        format_values = self._get_sequence_format_param(sequence)[1]
        sequence_number_reset = self._deduce_sequence_number_reset(sequence)
        date_start, date_end, forced_year_start, forced_year_end = self._get_sequence_date_range(sequence_number_reset)
        year_match = (
            (not format_values["year"] or self._year_match(format_values["year"], forced_year_start or date_start.year))
            and (not format_values["year_end"] or self._year_match(format_values["year_end"], forced_year_end or date_end.year))
        )
        month_match = not format_values['month'] or format_values['month'] == date.month
        return year_match and month_match

    @api.constrains(lambda self: (self._sequence_field, self._sequence_date_field))
    def _constrains_date_sequence(self):
        # Make it possible to bypass the constraint to allow edition of already messed up documents.
        # /!\ Do not use this to completely disable the constraint as it will make this mixin unreliable.
        constraint_date = fields.Date.to_date(self.env['ir.config_parameter'].sudo().get_param(
            'sequence.mixin.constraint_start_date',
            '1970-01-01'
        ))
        for record in self:
            if not record._must_check_constrains_date_sequence():
                continue
            date = fields.Date.to_date(record[record._sequence_date_field])
            sequence = record[record._sequence_field]
            if (
                sequence
                and date
                and date > constraint_date
                and not record._sequence_matches_date()
            ):
                raise ValidationError(_(
                    "The %(date_field)s (%(date)s) you've entered isn't aligned with the existing sequence number (%(sequence)s). Clear the sequence number to proceed.\n"
                    "To maintain date-based sequences, select entries and use the resequence option from the actions menu, available in developer mode.",
                    date_field=record._fields[record._sequence_date_field]._description_string(self.env),
                    date=format_date(self.env, date),
                    sequence=sequence,
                ))

    @api.depends(lambda self: [self._sequence_field])
    def _compute_split_sequence(self):
        for record in self:
            sequence = record[record._sequence_field] or ''
            # make the seq the only matching group
            regex = self._make_regex_non_capturing(record._sequence_fixed_regex.replace(r"?P<seq>", ""))
            matching = re.match(regex, sequence)
            record.sequence_prefix = sequence[:matching.start(1)]
            record.sequence_number = int(matching.group(1) or 0)

    @api.model
    def _deduce_sequence_number_reset(self, name):
        """Detect if the used sequence resets yearly, montly or never.

        :param name: the sequence that is used as a reference to detect the resetting
            periodicity. Typically, it is the last before the one you want to give a
            sequence.
        """
        for regex, ret_val, requirements in [
            (self._sequence_year_range_monthly_regex, 'year_range_month', ['seq', 'year', 'year_end', 'month']),
            (self._sequence_monthly_regex, 'month', ['seq', 'month', 'year']),
            (self._sequence_year_range_regex, 'year_range', ['seq', 'year', 'year_end']),
            (self._sequence_yearly_regex, 'year', ['seq', 'year']),
            (self._sequence_fixed_regex, 'never', ['seq']),
        ]:
            match = re.match(regex, name or '')
            if match:
                groupdict = match.groupdict()
                if (
                    groupdict.get('year_end') and groupdict.get('year')
                    and (
                        len(groupdict['year']) < len(groupdict['year_end'])
                        or self._truncate_year_to_length((int(groupdict['year']) + 1), len(groupdict['year_end'])) != int(groupdict['year_end'])
                    )
                ):
                    # year and year_end are not compatible for range (the difference is not 1)
                    continue
                if all(groupdict.get(req) is not None for req in requirements):
                    return ret_val
        raise ValidationError(_(
            'The sequence regex should at least contain the seq grouping keys. For instance:\n'
            r'^(?P<prefix1>.*?)(?P<seq>\d*)(?P<suffix>\D*?)$'
        ))

    def _make_regex_non_capturing(self, regex):
        r""" Replace the "named capturing group" found in the regex by
        "non-capturing group" instead.

        Example:
        `^(?P<prefix1>.*?)(?P<seq>\d{0,9})(?P<suffix>\D*?)$` will become
        `^(?:.*?)(?:\d{0,9})(?:\D*?)$`
        - `(?P<name>...)` = Named capturing groups
        - `(?:...)` = Non-capturing group

        :param regex: the regex to modify

        :return: the modified regex
        """
        return re.sub(r"\?P<\w+>", "?:", regex)

    def _get_last_sequence_domain(self, relaxed=False):
        """Get the sql domain to retreive the previous sequence number.

        This function should be overriden by models inheriting from this mixin.

        :param relaxed: see _get_last_sequence.

        :returns: tuple(where_string, where_params): with
            where_string: the entire SQL WHERE clause as a string.
            where_params: a dictionary containing the parameters to substitute
                at the execution of the query.
        """
        self.ensure_one()
        return "", {}

    def _get_starting_sequence(self):
        """Get a default sequence number.

        This function should be overriden by models heriting from this mixin
        This number will be incremented so you probably want to start the sequence at 0.

        :return: string to use as the default sequence to increment
        """
        self.ensure_one()
        return "00000000"

    def _get_last_sequence(self, relaxed=False, with_prefix=None):
        """Retrieve the previous sequence.

        This is done by taking the number with the greatest alphabetical value within
        the domain of _get_last_sequence_domain. This means that the prefix has a
        huge importance.
        For instance, if you have INV/2019/0001 and INV/2019/0002, when you rename the
        last one to FACT/2019/0001, one might expect the next number to be
        FACT/2019/0002 but it will be INV/2019/0002 (again) because INV > FACT.
        Therefore, changing the prefix might not be convenient during a period, and
        would only work when the numbering makes a new start (domain returns by
        _get_last_sequence_domain is [], i.e: a new year).

        :param field_name: the field that contains the sequence.
        :param relaxed: this should be set to True when a previous request didn't find
            something without. This allows to find a pattern from a previous period, and
            try to adapt it for the new period.
        :param with_prefix: The sequence prefix to restrict the search on, if any.

        :return: the string of the previous sequence or None if there wasn't any.
        """
        self.ensure_one()
        if self._sequence_field not in self._fields or not self._fields[self._sequence_field].store:
            raise ValidationError(_('%s is not a stored field', self._sequence_field))
        where_string, param = self._get_last_sequence_domain(relaxed)
        if self._origin.id:
            where_string += " AND id != %(id)s "
            param['id'] = self._origin.id
        if with_prefix is not None:
            where_string += " AND sequence_prefix = %(with_prefix)s "
            param['with_prefix'] = with_prefix

        query = f"""
                SELECT {self._sequence_field} FROM {self._table}
                {where_string}
                AND sequence_prefix = (SELECT sequence_prefix FROM {self._table} {where_string} ORDER BY id DESC LIMIT 1)
                ORDER BY sequence_number DESC
                LIMIT 1
        """

        self.flush_model([self._sequence_field, 'sequence_number', 'sequence_prefix'])
        self.env.cr.execute(query, param)
        return (self.env.cr.fetchone() or [None])[0]

    def _get_sequence_format_param(self, previous):
        """Get the python format and format values for the sequence.

        :param previous: the sequence we want to extract the format from
        :return tuple(format, format_values):
            format is the format string on which we should call .format()
            format_values is the dict of values to format the `format` string
            ``format.format(**format_values)`` should be equal to ``previous``
        """
        sequence_number_reset = self._deduce_sequence_number_reset(previous)
        regex = self._sequence_fixed_regex
        if sequence_number_reset == 'year':
            regex = self._sequence_yearly_regex
        elif sequence_number_reset == 'year_range':
            regex = self._sequence_year_range_regex
        elif sequence_number_reset == 'month':
            regex = self._sequence_monthly_regex
        elif sequence_number_reset == 'year_range_month':
            regex = self._sequence_year_range_monthly_regex
        format_values = re.match(regex, previous).groupdict()
        format_values['seq_length'] = len(format_values['seq'])
        format_values['year_length'] = len(format_values.get('year') or '')
        format_values['year_end_length'] = len(format_values.get('year_end') or '')
        if not format_values.get('seq') and 'prefix1' in format_values and 'suffix' in format_values:
            # if we don't have a seq, consider we only have a prefix and not a suffix
            format_values['prefix1'] = format_values['suffix']
            format_values['suffix'] = ''
        for field in ('seq', 'year', 'month', 'year_end'):
            format_values[field] = int(format_values.get(field) or 0)

        placeholders = re.findall(r'\b(prefix\d|seq|suffix\d?|year|year_end|month)\b', regex)
        format = ''.join(
            "{seq:0{seq_length}d}" if s == 'seq' else
            "{month:02d}" if s == 'month' else
            "{year:0{year_length}d}" if s == 'year' else
            "{year_end:0{year_end_length}d}" if s == 'year_end' else
            "{%s}" % s
            for s in placeholders
        )
        return format, format_values

    def _set_next_sequence(self):
        """Set the next sequence.

        This method ensures that the field is set both in the ORM and in the database.
        This is necessary because we use a database query to get the previous sequence,
        and we need that query to always be executed on the latest data.

        :param field_name: the field that contains the sequence.
        """
        self.ensure_one()
        last_sequence = self._get_last_sequence()
        new = not last_sequence
        if new:
            last_sequence = self._get_last_sequence(relaxed=True) or self._get_starting_sequence()

        format_string, format_values = self._get_sequence_format_param(last_sequence)
        sequence_number_reset = self._deduce_sequence_number_reset(last_sequence)
        if new:
            date_start, date_end, forced_year_start, forced_year_end = self._get_sequence_date_range(sequence_number_reset)
            format_values['seq'] = 0
            format_values['year'] = self._truncate_year_to_length(forced_year_start or date_start.year, format_values['year_length'])
            format_values['year_end'] = self._truncate_year_to_length(forced_year_end or date_end.year, format_values['year_end_length'])
            format_values['month'] = self[self._sequence_date_field].month

        # before flushing inside the savepoint (which may be rolled back!), make sure everything
        # is already flushed, otherwise we could lose non-sequence fields values, as the ORM believes
        # them to be flushed.
        self.flush_recordset()
        # because we are flushing, and because the business code might be flushing elsewhere (i.e. to
        # validate constraints), the fields depending on the sequence field might be protected by the
        # ORM. This is not desired, so we already reset them here.
        registry = self.env.registry
        triggers = registry._field_triggers[self._fields[self._sequence_field]]
        for inverse_field, triggered_fields in triggers.items():
            for triggered_field in triggered_fields:
                if not triggered_field.store or not triggered_field.compute:
                    continue
                for field in registry.field_inverses[inverse_field[0]] if inverse_field else [None]:
                    self.env.add_to_compute(triggered_field, self[field.name] if field else self)
        while True:
            format_values['seq'] = format_values['seq'] + 1
            sequence = format_string.format(**format_values)
            try:
                with self.env.cr.savepoint(flush=False), mute_logger('odoo.sql_db'):
                    self[self._sequence_field] = sequence
                    self.flush_recordset([self._sequence_field])
                    break
            except (pgerrors.ExclusionViolation, pgerrors.UniqueViolation):
                pass
        self._compute_split_sequence()
        self.flush_recordset(['sequence_prefix', 'sequence_number'])


    def _is_last_from_seq_chain(self):
        """Tells whether or not this element is the last one of the sequence chain.

        :return: True if it is the last element of the chain.
        """
        last_sequence = self._get_last_sequence(with_prefix=self.sequence_prefix)
        if not last_sequence:
            return True
        seq_format, seq_format_values = self._get_sequence_format_param(last_sequence)
        seq_format_values['seq'] += 1
        return seq_format.format(**seq_format_values) == self.name

    def _is_end_of_seq_chain(self):
        """Tells whether or not these elements are the last ones of the sequence chain.

        :return: True if self are the last elements of the chain.
        """
        batched = defaultdict(lambda: {'last_rec': self.browse(), 'seq_list': []})
        for record in self.filtered(lambda x: x[x._sequence_field]):
            seq_format, format_values = record._get_sequence_format_param(record[record._sequence_field])
            seq = format_values.pop('seq')
            batch = batched[(seq_format, frozendict(format_values))]
            batch['seq_list'].append(seq)
            if batch['last_rec'].sequence_number <= record.sequence_number:
                batch['last_rec'] = record

        for values in batched.values():
            # The sequences we are deleting are not sequential
            seq_list = values['seq_list']
            if max(seq_list) - min(seq_list) != len(seq_list) - 1:
                return False

            # last_rec must have the highest number in the database
            record = values['last_rec']
            if not record._is_last_from_seq_chain():
                return False
        return True