File: ir_attachment.py

package info (click to toggle)
odoo 18.0.0%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 878,716 kB
  • sloc: javascript: 927,937; python: 685,670; xml: 388,524; sh: 1,033; sql: 415; makefile: 26
file content (170 lines) | stat: -rw-r--r-- 6,840 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# -*- coding: utf-8 -*-
from odoo import api, models
from odoo.tools.pdf import OdooPdfFileReader, PdfReadError

from lxml import etree
from struct import error as StructError
import io
import logging
import zipfile

_logger = logging.getLogger(__name__)


class IrAttachment(models.Model):
    _inherit = 'ir.attachment'

    def _build_zip_from_attachments(self):
        """ Return the zip bytes content resulting from compressing the attachments in `self`"""
        buffer = io.BytesIO()
        with zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipfile_obj:
            for attachment in self:
                zipfile_obj.writestr(attachment.display_name, attachment.raw)
        return buffer.getvalue()

    # -------------------------------------------------------------------------
    # EDI
    # -------------------------------------------------------------------------

    def _decode_edi_xml(self, filename, content):
        """Decodes an xml into a list of one dictionary representing an attachment.
        :returns:           A list with a dictionary.
        """
        try:
            xml_tree = etree.fromstring(content)
        except Exception as e:
            _logger.info('Error when reading the xml file "%s": %s', filename, e)
            return []

        to_process = []
        if xml_tree is not None:
            to_process.append({
                'attachment': self,
                'filename': filename,
                'content': content,
                'xml_tree': xml_tree,
                'sort_weight': 10,
                'type': 'xml',
            })
        return to_process

    def _decode_edi_pdf(self, filename, content):
        """Decodes a pdf and unwrap sub-attachment into a list of dictionary each representing an attachment.
        :returns:           A list of dictionary for each attachment.
        """
        try:
            buffer = io.BytesIO(content)
            pdf_reader = OdooPdfFileReader(buffer, strict=False)
        except Exception as e:
            # Malformed pdf
            _logger.info('Error when reading the pdf file "%s": %s', filename, e)
            return []

        # Process embedded files.
        to_process = []
        try:
            for xml_name, xml_content in pdf_reader.getAttachments():
                embedded_files = self.env['ir.attachment']._decode_edi_xml(xml_name, xml_content)
                for file_data in embedded_files:
                    file_data['sort_weight'] += 1
                    file_data['originator_pdf'] = self
                to_process.extend(embedded_files)
        except (NotImplementedError, StructError, PdfReadError) as e:
            _logger.warning("Unable to access the attachments of %s. Tried to decrypt it, but %s.", filename, e)

        # Process the pdf itself.
        to_process.append({
            'filename': filename,
            'content': content,
            'pdf_reader': pdf_reader,
            'attachment': self,
            'on_close': buffer.close,
            'sort_weight': 20,
            'type': 'pdf',
        })

        return to_process

    def _decode_edi_binary(self, filename, content):
        """Decodes any file into a list of one dictionary representing an attachment.
        This is a fallback for all files that are not decoded by other methods.
        :returns:           A list with a dictionary.
        """
        return [{
            'filename': filename,
            'content': content,
            'attachment': self,
            'sort_weight': 100,
            'type': 'binary',
        }]

    @api.model
    def _get_edi_supported_formats(self):
        """Get the list of supported formats.
        This function is meant to be overriden to add formats.

        :returns:           A list of dictionary.

        * format:           Optional but helps debugging.
                            There are other methods that require the attachment
                            to be an XML other than the standard one.
        * check:            Function to be called on the attachment to pre-check if decoding will work.
        * decoder:          Function to be called on the attachment to unwrap it.
        """

        def is_xml(attachment):
            # XML attachments received by mail have a 'text/plain' mimetype (cfr. context key:
            # 'attachments_mime_plainxml'). Therefore, if content start with '<?xml', or if the filename ends with
            # '.xml', it is considered as XML.
            is_text_plain_xml = 'text/plain' in attachment.mimetype and (attachment.raw and attachment.raw.startswith(b'<?xml') or attachment.name.endswith('.xml'))
            return attachment.mimetype.endswith('/xml') or is_text_plain_xml

        return [
            {
                'format': 'pdf',
                'check': lambda attachment: 'pdf' in attachment.mimetype,
                'decoder': self._decode_edi_pdf,
            },
            {
                'format': 'xml',
                'check': is_xml,
                'decoder': self._decode_edi_xml,
            },
            {
                'format': 'binary',
                'check': lambda attachment: True,
                'decoder': self._decode_edi_binary,
            },
        ]

    def _unwrap_edi_attachments(self):
        """Decodes ir.attachment and unwrap sub-attachment into a sorted list of
        dictionary each representing an attachment.

        :returns:           A list of dictionary for each attachment.
        * filename:         The name of the attachment.
        * content:          The content of the attachment.
        * type:             The type of the attachment.
        * xml_tree:         The tree of the xml if type is xml.
        * pdf_reader:       The pdf_reader if type is pdf.
        * attachment:       The associated ir.attachment if any
        * sort_weight:      The associated weigth used for sorting the arrays
        """
        to_process = []

        for attachment in self:
            supported_formats = attachment._get_edi_supported_formats()
            for supported_format in supported_formats:
                if supported_format['check'](attachment):
                    to_process += supported_format['decoder'](attachment.name, attachment.raw)

        to_process.sort(key=lambda x: x['sort_weight'])

        return to_process

    def _post_add_create(self, **kwargs):
        move_attachments = self.filtered(lambda attachment: attachment.res_model == 'account.move')
        moves_per_id = self.env['account.move'].browse([attachment.res_id for attachment in move_attachments]).grouped('id')
        for attachment in move_attachments:
            moves_per_id[attachment.res_id]._check_and_decode_attachment(attachment)
        super()._post_add_create(**kwargs)