import base64
import logging
import io
from lxml import etree
from xml.sax.saxutils import escape, quoteattr
from odoo import _, api, fields, models, tools, SUPERUSER_ID
from odoo.tools import cleanup_xml_node
from odoo.tools.pdf import OdooPdfFileReader, OdooPdfFileWriter
_logger = logging.getLogger(__name__)
class AccountMoveSend(models.AbstractModel):
_inherit = 'account.move.send'
# -------------------------------------------------------------------------
# ALERTS
# -------------------------------------------------------------------------
def _get_alerts(self, moves, moves_data):
# EXTENDS 'account'
alerts = super()._get_alerts(moves, moves_data)
peppol_formats = set(self.env['res.partner']._get_peppol_formats())
if peppol_format_moves := moves.filtered(lambda m: moves_data[m]['invoice_edi_format'] in peppol_formats):
not_configured_company_partners = peppol_format_moves.company_id.partner_id.filtered(
lambda partner: not (partner.peppol_eas and partner.peppol_endpoint)
)
if not_configured_company_partners:
alerts['account_edi_ubl_cii_configure_company'] = {
'message': _("Please fill in Peppol EAS and Peppol Endpoint in your company form to generate a complete file."),
'level': 'info',
'action_text': _("View Company"),
'action': not_configured_company_partners._get_records_action(),
}
not_configured_partners = peppol_format_moves.partner_id.commercial_partner_id.filtered(
lambda partner: not (partner.peppol_eas and partner.peppol_endpoint)
)
if not_configured_partners:
alerts['account_edi_ubl_cii_configure_partner'] = {
'message': _("These partners are missing Peppol Address. "
"Please check those in their Accounting tab. "
"Otherwise, the generated files will be incomplete."),
'level': 'info',
'action_text': _("View Partner(s)"),
'action': not_configured_partners._get_records_action(name=_("Check Partner(s)"))
}
ubl_formats = set(self.env['res.partner']._get_ubl_cii_formats())
if moves_without_bank := moves.filtered(lambda m: m.move_type == 'out_invoice' and moves_data[m]['invoice_edi_format'] in ubl_formats and not m.partner_bank_id):
alerts['account_edi_ubl_cii_configure_bank'] = {
'message': _("Please add a Recipient bank in the 'Other Info' tab to generate a complete file."),
'level': 'danger' if len(moves_without_bank) == 1 else 'warning',
'action_text': _("View Invoice(s)"),
'action': moves_without_bank._get_records_action(name=_("Check Invoice(s)")),
}
return alerts
# -------------------------------------------------------------------------
# ATTACHMENTS
# -------------------------------------------------------------------------
def _get_invoice_extra_attachments(self, move):
# EXTENDS 'account'
return super()._get_invoice_extra_attachments(move) + move.ubl_cii_xml_id
def _get_placeholder_mail_attachments_data(self, move, extra_edis=None):
# EXTENDS 'account'
results = super()._get_placeholder_mail_attachments_data(move, extra_edis=extra_edis)
partner_edi_format = self._get_default_invoice_edi_format(move)
if move._need_ubl_cii_xml(partner_edi_format):
builder = move.partner_id.commercial_partner_id._get_edi_builder(partner_edi_format)
filename = builder._export_invoice_filename(move)
results.append({
'id': f'placeholder_{filename}',
'name': filename,
'mimetype': 'application/xml',
'placeholder': True,
})
return results
# -------------------------------------------------------------------------
# BUSINESS ACTIONS
# -------------------------------------------------------------------------
def _hook_invoice_document_before_pdf_report_render(self, invoice, invoice_data):
# EXTENDS 'account'
super()._hook_invoice_document_before_pdf_report_render(invoice, invoice_data)
if invoice._need_ubl_cii_xml(invoice_data['invoice_edi_format']):
builder = invoice.partner_id.commercial_partner_id._get_edi_builder(invoice_data['invoice_edi_format'])
xml_content, errors = builder._export_invoice(invoice)
filename = builder._export_invoice_filename(invoice)
# Failed.
if errors:
invoice_data['error'] = {
'error_title': _("Errors occurred while creating the EDI document (format: %s):", builder._description),
'errors': errors,
}
invoice_data['error_but_continue'] = True
else:
invoice_data['ubl_cii_xml_attachment_values'] = {
'name': filename,
'raw': xml_content,
'mimetype': 'application/xml',
'res_model': invoice._name,
'res_id': invoice.id,
'res_field': 'ubl_cii_xml_file', # Binary field
}
invoice_data['ubl_cii_xml_options'] = {
'ubl_cii_format': invoice_data['invoice_edi_format'],
'builder': builder,
}
def _hook_invoice_document_after_pdf_report_render(self, invoice, invoice_data):
# EXTENDS 'account'
super()._hook_invoice_document_after_pdf_report_render(invoice, invoice_data)
# Add PDF to XML
if 'ubl_cii_xml_options' in invoice_data and invoice_data['ubl_cii_xml_options']['ubl_cii_format'] != 'facturx':
self._postprocess_invoice_ubl_xml(invoice, invoice_data)
# Always silently generate a Factur-X and embed it inside the PDF for inter-portability
if invoice_data.get('ubl_cii_xml_options', {}).get('ubl_cii_format') == 'facturx':
xml_facturx = invoice_data['ubl_cii_xml_attachment_values']['raw']
else:
xml_facturx = self.env['account.edi.xml.cii']._export_invoice(invoice)[0]
# during tests, no wkhtmltopdf, create the attachment for test purposes
if tools.config['test_enable']:
self.env['ir.attachment'].create({
'name': 'factur-x.xml',
'raw': xml_facturx,
'res_id': invoice.id,
'res_model': 'account.move',
})
return
# Read pdf content.
pdf_values = invoice.invoice_pdf_report_id or invoice_data.get('pdf_attachment_values') or invoice_data['proforma_pdf_attachment_values']
reader_buffer = io.BytesIO(pdf_values['raw'])
reader = OdooPdfFileReader(reader_buffer, strict=False)
# Post-process.
writer = OdooPdfFileWriter()
writer.cloneReaderDocumentRoot(reader)
writer.addAttachment('factur-x.xml', xml_facturx, subtype='text/xml')
# PDF-A.
if invoice_data.get('ubl_cii_xml_options', {}).get('ubl_cii_format') == 'facturx' \
and not writer.is_pdfa:
try:
writer.convert_to_pdfa()
except Exception:
_logger.exception("Error while converting to PDF/A")
# Extra metadata to be Factur-x PDF-A compliant.
content = self.env['ir.qweb']._render(
'account_edi_ubl_cii.account_invoice_pdfa_3_facturx_metadata',
{
'title': invoice.name,
'date': fields.Date.context_today(self),
},
)
writer.add_file_metadata(content.encode())
# Replace the current content.
writer_buffer = io.BytesIO()
writer.write(writer_buffer)
pdf_values['raw'] = writer_buffer.getvalue()
reader_buffer.close()
writer_buffer.close()
@api.model
def _postprocess_invoice_ubl_xml(self, invoice, invoice_data):
# Adding the PDF to the XML
tree = etree.fromstring(invoice_data['ubl_cii_xml_attachment_values']['raw'])
anchor_elements = tree.xpath("//*[local-name()='AccountingSupplierParty']")
if not anchor_elements:
return
xmlns_move_type = 'Invoice' if invoice.move_type == 'out_invoice' else 'CreditNote'
pdf_values = invoice.invoice_pdf_report_id or invoice_data.get('pdf_attachment_values') or invoice_data['proforma_pdf_attachment_values']
filename = pdf_values['name']
content = pdf_values['raw']
doc_type_node = ""
edi_model = invoice_data["ubl_cii_xml_options"]["builder"]
doc_type_code_vals = edi_model._get_document_type_code_vals(invoice, invoice_data)
if doc_type_code_vals['value']:
doc_type_code_attrs = " ".join(f'{name}="{value}"' for name, value in doc_type_code_vals['attrs'].items())
doc_type_node = f"{doc_type_code_vals['value']}"
to_inject = f'''
{escape(filename)}
{doc_type_node}
{base64.b64encode(content).decode()}
'''
anchor_index = tree.index(anchor_elements[0])
tree.insert(anchor_index, etree.fromstring(to_inject))
invoice_data['ubl_cii_xml_attachment_values']['raw'] = etree.tostring(
cleanup_xml_node(tree), xml_declaration=True, encoding='UTF-8'
)
def _link_invoice_documents(self, invoices_data):
# EXTENDS 'account'
super()._link_invoice_documents(invoices_data)
attachments_vals = [
invoice_data.get('ubl_cii_xml_attachment_values')
for invoice_data in invoices_data.values()
if invoice_data.get('ubl_cii_xml_attachment_values')
]
if attachments_vals:
attachments = self.env['ir.attachment'].with_user(SUPERUSER_ID).create(attachments_vals)
res_ids = attachments.mapped('res_id')
self.env['account.move'].browse(res_ids).invalidate_recordset(fnames=['ubl_cii_xml_id', 'ubl_cii_xml_file'])