# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import io import logging import xml.dom.minidom import zipfile from odoo import api, models from odoo.tools.lru import LRU _logger = logging.getLogger(__name__) try: from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.converter import TextConverter from pdfminer.pdfpage import PDFPage except ImportError: PDFResourceManager = PDFPageInterpreter = TextConverter = PDFPage = None _logger.warning("Attachment indexation of PDF documents is unavailable because the 'pdfminer' Python library cannot be found on the system. " "You may install it from https://pypi.org/project/pdfminer.six/ (e.g. `pip3 install pdfminer.six`)") FTYPES = ['docx', 'pptx', 'xlsx', 'opendoc', 'pdf'] index_content_cache = LRU(1) def textToString(element): buff = u"" for node in element.childNodes: if node.nodeType == xml.dom.Node.TEXT_NODE: buff += node.nodeValue elif node.nodeType == xml.dom.Node.ELEMENT_NODE: buff += textToString(node) return buff class IrAttachment(models.Model): _inherit = 'ir.attachment' def _index_docx(self, bin_data): '''Index Microsoft .docx documents''' buf = u"" f = io.BytesIO(bin_data) if zipfile.is_zipfile(f): try: zf = zipfile.ZipFile(f) content = xml.dom.minidom.parseString(zf.read("word/document.xml")) for val in ["w:p", "w:h", "text:list"]: for element in content.getElementsByTagName(val): buf += textToString(element) + "\n" except Exception: pass return buf def _index_pptx(self, bin_data): '''Index Microsoft .pptx documents''' buf = u"" f = io.BytesIO(bin_data) if zipfile.is_zipfile(f): try: zf = zipfile.ZipFile(f) zf_filelist = [x for x in zf.namelist() if x.startswith('ppt/slides/slide')] for i in range(1, len(zf_filelist) + 1): content = xml.dom.minidom.parseString(zf.read('ppt/slides/slide%s.xml' % i)) for val in ["a:t"]: for element in content.getElementsByTagName(val): buf += textToString(element) + "\n" except Exception: pass return buf def _index_xlsx(self, bin_data): '''Index Microsoft .xlsx documents''' buf = u"" f = io.BytesIO(bin_data) if zipfile.is_zipfile(f): try: zf = zipfile.ZipFile(f) content = xml.dom.minidom.parseString(zf.read("xl/sharedStrings.xml")) for val in ["t"]: for element in content.getElementsByTagName(val): buf += textToString(element) + "\n" except Exception: pass return buf def _index_opendoc(self, bin_data): '''Index OpenDocument documents (.odt, .ods...)''' buf = u"" f = io.BytesIO(bin_data) if zipfile.is_zipfile(f): try: zf = zipfile.ZipFile(f) content = xml.dom.minidom.parseString(zf.read("content.xml")) for val in ["text:p", "text:h", "text:list"]: for element in content.getElementsByTagName(val): buf += textToString(element) + "\n" except Exception: pass return buf def _index_pdf(self, bin_data): '''Index PDF documents''' if PDFResourceManager is None: return buf = u"" if bin_data.startswith(b'%PDF-'): f = io.BytesIO(bin_data) try: resource_manager = PDFResourceManager() with io.StringIO() as content, TextConverter(resource_manager, content) as device: logging.getLogger("pdfminer").setLevel(logging.CRITICAL) interpreter = PDFPageInterpreter(resource_manager, device) for page in PDFPage.get_pages(f): interpreter.process_page(page) buf = content.getvalue() except Exception: pass return buf @api.model def _index(self, bin_data, mimetype, checksum=None): if checksum: cached_content = index_content_cache.get(checksum) if cached_content: return cached_content res = False for ftype in FTYPES: buf = getattr(self, '_index_%s' % ftype)(bin_data) if buf: res = buf.replace('\x00', '') break res = res or super(IrAttachment, self)._index(bin_data, mimetype, checksum=checksum) if checksum: index_content_cache[checksum] = res return res def copy(self, default=None): for attachment in self: index_content_cache[attachment.checksum] = attachment.index_content return super().copy(default=default)