File: account_edi_document.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (288 lines) | stat: -rw-r--r-- 14,134 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import logging

import psycopg2.errors

from odoo import models, fields, api, _
from odoo.exceptions import UserError


_logger = logging.getLogger(__name__)

DEFAULT_BLOCKING_LEVEL = 'error'


class AccountEdiDocument(models.Model):
    _name = 'account.edi.document'
    _description = 'Electronic Document for an account.move'

    # == Stored fields ==
    move_id = fields.Many2one('account.move', required=True, ondelete='cascade', index=True)
    edi_format_id = fields.Many2one('account.edi.format', required=True)
    attachment_id = fields.Many2one(
        comodel_name='ir.attachment',
        groups='base.group_system',
        help="The file generated by edi_format_id when the invoice is posted (and this document is processed).",
    )
    state = fields.Selection([('to_send', 'To Send'), ('sent', 'Sent'), ('to_cancel', 'To Cancel'), ('cancelled', 'Cancelled')])
    error = fields.Html(help='The text of the last error that happened during Electronic Invoice operation.')
    blocking_level = fields.Selection(
        selection=[('info', 'Info'), ('warning', 'Warning'), ('error', 'Error')],
        help="Blocks the current operation of the document depending on the error severity:\n"
        "  * Info: the document is not blocked and everything is working as it should.\n"
        "  * Warning: there is an error that doesn't prevent the current Electronic Invoicing operation to succeed.\n"
        "  * Error: there is an error that blocks the current Electronic Invoicing operation.")

    # == Not stored fields ==
    name = fields.Char(related='attachment_id.name')
    edi_format_name = fields.Char(string='Format Name', related='edi_format_id.name')
    edi_content = fields.Binary(compute='_compute_edi_content', compute_sudo=True)

    _sql_constraints = [
        (
            'unique_edi_document_by_move_by_format',
            'UNIQUE(edi_format_id, move_id)',
            'Only one edi document by move by format',
        ),
    ]

    @api.depends('move_id', 'error', 'state')
    def _compute_edi_content(self):
        for doc in self:
            res = b''
            if doc.state in ('to_send', 'to_cancel'):
                move = doc.move_id
                config_errors = doc.edi_format_id._check_move_configuration(move)
                if config_errors:
                    res = base64.b64encode('\n'.join(config_errors).encode('UTF-8'))
                else:
                    move_applicability = doc.edi_format_id._get_move_applicability(move)
                    if move_applicability and move_applicability.get('edi_content'):
                        res = base64.b64encode(move_applicability['edi_content'](move))
            doc.edi_content = res

    def action_export_xml(self):
        self.ensure_one()
        return {
            'type': 'ir.actions.act_url',
            'url':  '/web/content/account.edi.document/%s/edi_content' % self.id
        }

    def _prepare_jobs(self):
        """Creates a list of jobs to be performed by '_process_job' for the documents in self.
        Each document represent a job, BUT if multiple documents have the same state, edi_format_id,
        doc_type invoice and company_id AND the edi_format_id supports batching, they are grouped
        into a single job.

        :returns:  [{
            'documents': account.edi.document,
            'method_to_call': str,
        }]
        """
        # Classify jobs by (edi_format, edi_doc.state, doc_type, move.company_id, custom_key)
        to_process = {}
        for state, edi_flow in (('to_send', 'post'), ('to_cancel', 'cancel')):
            documents = self.filtered(lambda d: d.state == state and d.blocking_level != 'error')
            for edi_doc in documents:
                edi_format = edi_doc.edi_format_id
                move = edi_doc.move_id
                move_applicability = edi_doc.edi_format_id._get_move_applicability(move) or {}

                batching_key = [edi_format, state, move.company_id]
                custom_batching_key = f'{edi_flow}_batching'
                if move_applicability.get(custom_batching_key):
                    batching_key += list(move_applicability[custom_batching_key](move))
                else:
                    batching_key.append(move.id)

                batch = to_process.setdefault(tuple(batching_key), {
                    'documents': self.env['account.edi.document'],
                    'method_to_call': move_applicability.get(edi_flow),
                })
                batch['documents'] |= edi_doc

        return list(to_process.values())

    @api.model
    def _process_job(self, job):
        """Post or cancel move_id by calling the related methods on edi_format_id.

        :param job:  {
            'documents': account.edi.document,
            'method_to_call': str,
        }
        """
        def _postprocess_post_edi_results(documents, edi_result):
            attachments_to_unlink = self.env['ir.attachment']
            for document in documents:
                move = document.move_id
                move_result = edi_result.get(move, {})
                if move_result.get('attachment'):
                    old_attachment = document.sudo().attachment_id
                    document.sudo().attachment_id = move_result['attachment']
                    if not old_attachment.res_model or not old_attachment.res_id:
                        attachments_to_unlink |= old_attachment
                if move_result.get('success') is True:
                    document.write({
                        'state': 'sent',
                        'error': False,
                        'blocking_level': False,
                    })
                else:
                    document.write({
                        'error': move_result.get('error', False),
                        'blocking_level': move_result.get('blocking_level', DEFAULT_BLOCKING_LEVEL) if 'error' in move_result else False,
                    })

            # Attachments that are not explicitly linked to a business model could be removed because they are not
            # supposed to have any traceability from the user.
            attachments_to_unlink.sudo().unlink()

        def _postprocess_cancel_edi_results(documents, edi_result):
            move_ids_to_cancel = set()  # Avoid duplicates
            attachments_to_unlink = self.env['ir.attachment']
            for document in documents:
                move = document.move_id
                move_result = edi_result.get(move, {})
                if move_result.get('success') is True:
                    old_attachment = document.sudo().attachment_id
                    document.sudo().write({
                        'state': 'cancelled',
                        'error': False,
                        'attachment_id': False,
                        'blocking_level': False,
                    })

                    if move.state == 'posted' and all(
                        doc.state == 'cancelled'
                        or not doc.edi_format_id._needs_web_services()
                        for doc in move.edi_document_ids
                    ):
                        # The user requested a cancellation of the EDI and it has been approved. Then, the invoice
                        # can be safely cancelled.
                        move_ids_to_cancel.add(move.id)

                    if not old_attachment.res_model or not old_attachment.res_id:
                        attachments_to_unlink |= old_attachment

                else:
                    document.write({
                        'error': move_result.get('error', False),
                        'blocking_level': move_result.get('blocking_level', DEFAULT_BLOCKING_LEVEL) if move_result.get('error') else False,
                    })

            if move_ids_to_cancel:
                invoices = self.env['account.move'].browse(list(move_ids_to_cancel))
                invoices.button_draft()
                invoices.button_cancel()

            # Attachments that are not explicitly linked to a business model could be removed because they are not
            # supposed to have any traceability from the user.
            attachments_to_unlink.sudo().unlink()

        documents = job['documents']
        if job['method_to_call']:
            method_to_call = job['method_to_call']
        else:
            method_to_call = lambda moves: {move: {'success': True} for move in moves}
        documents.edi_format_id.ensure_one()  # All account.edi.document of a job should have the same edi_format_id
        documents.move_id.company_id.ensure_one()  # All account.edi.document of a job should be from the same company
        if len(set(doc.state for doc in documents)) != 1:
            raise ValueError('All account.edi.document of a job should have the same state')

        state = documents[0].state
        documents.move_id.line_ids.flush_recordset()  # manual flush for tax details
        moves = documents.move_id
        if state == 'to_send':
            with moves._send_only_when_ready():
                edi_result = method_to_call(moves)
            _postprocess_post_edi_results(documents, edi_result)
        elif state == 'to_cancel':
            edi_result = method_to_call(moves)
            _postprocess_cancel_edi_results(documents, edi_result)

    def _process_documents_no_web_services(self):
        """ Post and cancel all the documents that don't need a web service.
        """
        jobs = self.filtered(lambda d: not d.edi_format_id._needs_web_services())._prepare_jobs()
        for job in jobs:
            self._process_job(job)

    def _process_documents_web_services(self, job_count=None, with_commit=True):
        ''' Post and cancel all the documents that need a web service.

        :param job_count:   The maximum number of jobs to process if specified.
        :param with_commit: Flag indicating a commit should be made between each job.
        :return:            The number of remaining jobs to process.
        '''
        all_jobs = self.filtered(lambda d: d.edi_format_id._needs_web_services())._prepare_jobs()
        jobs_to_process = all_jobs[0:job_count] if job_count else all_jobs

        for job in jobs_to_process:
            documents = job['documents']
            move_to_lock = documents.move_id
            attachments_potential_unlink = documents.sudo().attachment_id.filtered(lambda a: not a.res_model and not a.res_id)
            try:
                with self.env.cr.savepoint(flush=False):
                    self._cr.execute('SELECT * FROM account_edi_document WHERE id IN %s FOR UPDATE NOWAIT', [tuple(documents.ids)])
                    self._cr.execute('SELECT * FROM account_move WHERE id IN %s FOR UPDATE NOWAIT', [tuple(move_to_lock.ids)])

                    # Locks the attachments that might be unlinked
                    if attachments_potential_unlink:
                        self._cr.execute('SELECT * FROM ir_attachment WHERE id IN %s FOR UPDATE NOWAIT', [tuple(attachments_potential_unlink.ids)])

            except psycopg2.errors.LockNotAvailable:
                _logger.debug('Another transaction already locked documents rows. Cannot process documents.')
                if not with_commit:
                    raise UserError(_('This document is being sent by another process already. ')) from None
                continue
            self._process_job(job)
            if with_commit and len(jobs_to_process) > 1:
                self.env.cr.commit()

        return len(all_jobs) - len(jobs_to_process)

    @api.model
    def _cron_process_documents_web_services(self, job_count=None):
        ''' Method called by the EDI cron processing all web-services.

        :param job_count: Limit explicitely the number of web service calls. If not provided, process all.
        '''
        edi_documents = self.search([('state', 'in', ('to_send', 'to_cancel')), ('move_id.state', '=', 'posted')])
        nb_remaining_jobs = edi_documents._process_documents_web_services(job_count=job_count)

        # Mark the CRON to be triggered again asap since there is some remaining jobs to process.
        if nb_remaining_jobs > 0:
            self.env.ref('account_edi.ir_cron_edi_network')._trigger()

    def _filter_edi_attachments_for_mailing(self):
        """
        Will either return the information about the attachment of the edi document for adding the attachment in the
        mail, or the attachment id to be linked to the 'send & print' wizard.
        Can be overridden where e.g. a zip-file needs to be sent with the individual files instead of the entire zip
        IMPORTANT:
        * If the attachment's id is returned, no new attachment will be created, the existing one on the move is linked
        to the wizard (see computed attachment_ids field in mail.compose.message).
        * If the attachment's content is returned, a new one is created and linked to the wizard. Thus, when sending
        the mail (clicking on 'send & print' in the wizard), a new attachment is added to the move (see
        _action_send_mail in mail.compose.message).
        :param document: an edi document
        :return: dict {
            'attachments': tuple with the name and base64 content of the attachment}
            'attachment_ids': list containing the id of the attachment
        }
        """
        self.ensure_one()
        attachment_sudo = self.sudo().attachment_id
        if not attachment_sudo:
            return {}
        if not (attachment_sudo.res_model and attachment_sudo.res_id):
            # do not return system attachment not linked to a record
            return {}
        if len(self._context.get('active_ids', [])) > 1:
            # In mass mail mode 'attachments_ids' is removed from template values
            # as they should not be rendered
            return {'attachments': [(attachment_sudo.name, attachment_sudo.datas)]}
        return {'attachment_ids': attachment_sudo.ids}