Odoo18-Base/addons/account/models/ir_attachment.py
2025-01-06 10:57:38 +07:00

172 lines
6.7 KiB
Python

# -*- coding: utf-8 -*-
from odoo import api, models
from odoo.tools.pdf import OdooPdfFileReader, PdfReadError
from odoo.tools.mimetypes import guess_mimetype
from lxml import etree
from struct import error as StructError
import io
import logging
import zipfile
_logger = logging.getLogger(__name__)
class IrAttachment(models.Model):
_inherit = 'ir.attachment'
def _build_zip_from_attachments(self):
""" Return the zip bytes content resulting from compressing the attachments in `self`"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED) as zipfile_obj:
for attachment in self:
zipfile_obj.writestr(attachment.display_name, attachment.raw)
return buffer.getvalue()
# -------------------------------------------------------------------------
# EDI
# -------------------------------------------------------------------------
def _decode_edi_xml(self, filename, content):
"""Decodes an xml into a list of one dictionary representing an attachment.
:returns: A list with a dictionary.
"""
try:
xml_tree = etree.fromstring(content)
except Exception as e:
_logger.info('Error when reading the xml file "%s": %s', filename, e)
return []
to_process = []
if xml_tree is not None:
to_process.append({
'attachment': self,
'filename': filename,
'content': content,
'xml_tree': xml_tree,
'sort_weight': 10,
'type': 'xml',
})
return to_process
def _decode_edi_pdf(self, filename, content):
"""Decodes a pdf and unwrap sub-attachment into a list of dictionary each representing an attachment.
:returns: A list of dictionary for each attachment.
"""
try:
buffer = io.BytesIO(content)
pdf_reader = OdooPdfFileReader(buffer, strict=False)
except Exception as e:
# Malformed pdf
_logger.info('Error when reading the pdf file "%s": %s', filename, e)
return []
# Process embedded files.
to_process = []
try:
for xml_name, xml_content in pdf_reader.getAttachments():
embedded_files = self.env['ir.attachment']._decode_edi_xml(xml_name, xml_content)
for file_data in embedded_files:
file_data['sort_weight'] += 1
file_data['originator_pdf'] = self
to_process.extend(embedded_files)
except (NotImplementedError, StructError, PdfReadError) as e:
_logger.warning("Unable to access the attachments of %s. Tried to decrypt it, but %s.", filename, e)
# Process the pdf itself.
to_process.append({
'filename': filename,
'content': content,
'pdf_reader': pdf_reader,
'attachment': self,
'on_close': buffer.close,
'sort_weight': 20,
'type': 'pdf',
})
return to_process
def _decode_edi_binary(self, filename, content):
"""Decodes any file into a list of one dictionary representing an attachment.
This is a fallback for all files that are not decoded by other methods.
:returns: A list with a dictionary.
"""
return [{
'filename': filename,
'content': content,
'attachment': self,
'sort_weight': 100,
'type': 'binary',
}]
@api.model
def _get_edi_supported_formats(self):
"""Get the list of supported formats.
This function is meant to be overriden to add formats.
:returns: A list of dictionary.
* format: Optional but helps debugging.
There are other methods that require the attachment
to be an XML other than the standard one.
* check: Function to be called on the attachment to pre-check if decoding will work.
* decoder: Function to be called on the attachment to unwrap it.
"""
def is_xml(attachment):
# XML attachments received by mail have a 'text/plain' mimetype (cfr. context key:
# 'attachments_mime_plainxml'). Therefore, if content start with '<?xml', or if the filename ends with
# '.xml', it is considered as XML.
is_text_plain_xml = 'text/plain' in attachment.mimetype and (guess_mimetype(attachment.raw).endswith('/xml') or attachment.name.endswith('.xml'))
return attachment.mimetype.endswith('/xml') or is_text_plain_xml
return [
{
'format': 'pdf',
'check': lambda attachment: 'pdf' in attachment.mimetype,
'decoder': self._decode_edi_pdf,
},
{
'format': 'xml',
'check': is_xml,
'decoder': self._decode_edi_xml,
},
{
'format': 'binary',
'check': lambda attachment: True,
'decoder': self._decode_edi_binary,
},
]
def _unwrap_edi_attachments(self):
"""Decodes ir.attachment and unwrap sub-attachment into a sorted list of
dictionary each representing an attachment.
:returns: A list of dictionary for each attachment.
* filename: The name of the attachment.
* content: The content of the attachment.
* type: The type of the attachment.
* xml_tree: The tree of the xml if type is xml.
* pdf_reader: The pdf_reader if type is pdf.
* attachment: The associated ir.attachment if any
* sort_weight: The associated weigth used for sorting the arrays
"""
to_process = []
for attachment in self:
supported_formats = attachment._get_edi_supported_formats()
for supported_format in supported_formats:
if supported_format['check'](attachment):
to_process += supported_format['decoder'](attachment.name, attachment.raw)
to_process.sort(key=lambda x: x['sort_weight'])
return to_process
def _post_add_create(self, **kwargs):
move_attachments = self.filtered(lambda attachment: attachment.res_model == 'account.move')
moves_per_id = self.env['account.move'].browse([attachment.res_id for attachment in move_attachments]).grouped('id')
for attachment in move_attachments:
moves_per_id[attachment.res_id]._check_and_decode_attachment(attachment)
super()._post_add_create(**kwargs)