1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import io
import logging
import xml.dom.minidom
import zipfile
from odoo import api, models
from odoo.tools.lru import LRU
_logger = logging.getLogger(__name__)
try:
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.converter import TextConverter
from pdfminer.pdfpage import PDFPage
except ImportError:
PDFResourceManager = PDFPageInterpreter = TextConverter = PDFPage = None
_logger.warning("Attachment indexation of PDF documents is unavailable because the 'pdfminer' Python library cannot be found on the system. "
"You may install it from https://pypi.org/project/pdfminer.six/ (e.g. `pip3 install pdfminer.six`)")
FTYPES = ['docx', 'pptx', 'xlsx', 'opendoc', 'pdf']
index_content_cache = LRU(1)
def textToString(element):
buff = u""
for node in element.childNodes:
if node.nodeType == xml.dom.Node.TEXT_NODE:
buff += node.nodeValue
elif node.nodeType == xml.dom.Node.ELEMENT_NODE:
buff += textToString(node)
return buff
class IrAttachment(models.Model):
_inherit = 'ir.attachment'
def _index_docx(self, bin_data):
'''Index Microsoft .docx documents'''
buf = u""
f = io.BytesIO(bin_data)
if zipfile.is_zipfile(f):
try:
zf = zipfile.ZipFile(f)
content = xml.dom.minidom.parseString(zf.read("word/document.xml"))
for val in ["w:p", "w:h", "text:list"]:
for element in content.getElementsByTagName(val):
buf += textToString(element) + "\n"
except Exception:
pass
return buf
def _index_pptx(self, bin_data):
'''Index Microsoft .pptx documents'''
buf = u""
f = io.BytesIO(bin_data)
if zipfile.is_zipfile(f):
try:
zf = zipfile.ZipFile(f)
zf_filelist = [x for x in zf.namelist() if x.startswith('ppt/slides/slide')]
for i in range(1, len(zf_filelist) + 1):
content = xml.dom.minidom.parseString(zf.read('ppt/slides/slide%s.xml' % i))
for val in ["a:t"]:
for element in content.getElementsByTagName(val):
buf += textToString(element) + "\n"
except Exception:
pass
return buf
def _index_xlsx(self, bin_data):
'''Index Microsoft .xlsx documents'''
buf = u""
f = io.BytesIO(bin_data)
if zipfile.is_zipfile(f):
try:
zf = zipfile.ZipFile(f)
content = xml.dom.minidom.parseString(zf.read("xl/sharedStrings.xml"))
for val in ["t"]:
for element in content.getElementsByTagName(val):
buf += textToString(element) + "\n"
except Exception:
pass
return buf
def _index_opendoc(self, bin_data):
'''Index OpenDocument documents (.odt, .ods...)'''
buf = u""
f = io.BytesIO(bin_data)
if zipfile.is_zipfile(f):
try:
zf = zipfile.ZipFile(f)
content = xml.dom.minidom.parseString(zf.read("content.xml"))
for val in ["text:p", "text:h", "text:list"]:
for element in content.getElementsByTagName(val):
buf += textToString(element) + "\n"
except Exception:
pass
return buf
def _index_pdf(self, bin_data):
'''Index PDF documents'''
if PDFResourceManager is None:
return
buf = u""
if bin_data.startswith(b'%PDF-'):
f = io.BytesIO(bin_data)
try:
resource_manager = PDFResourceManager()
with io.StringIO() as content, TextConverter(resource_manager, content) as device:
logging.getLogger("pdfminer").setLevel(logging.CRITICAL)
interpreter = PDFPageInterpreter(resource_manager, device)
for page in PDFPage.get_pages(f):
interpreter.process_page(page)
buf = content.getvalue()
except Exception:
pass
return buf
@api.model
def _index(self, bin_data, mimetype, checksum=None):
if checksum:
cached_content = index_content_cache.get(checksum)
if cached_content:
return cached_content
res = False
for ftype in FTYPES:
buf = getattr(self, '_index_%s' % ftype)(bin_data)
if buf:
res = buf.replace('\x00', '')
break
res = res or super(IrAttachment, self)._index(bin_data, mimetype, checksum=checksum)
if checksum:
index_content_cache[checksum] = res
return res
def copy(self, default=None):
for attachment in self:
index_content_cache[attachment.checksum] = attachment.index_content
return super().copy(default=default)
|