479 lines
24 KiB
Python
479 lines
24 KiB
Python
import json
|
|
from hashlib import sha256
|
|
from base64 import b64decode, b64encode
|
|
from lxml import etree
|
|
from datetime import datetime
|
|
from odoo import models, fields, _, api
|
|
from odoo.exceptions import UserError
|
|
from odoo.tools import format_list
|
|
|
|
|
|
class AccountEdiFormat(models.Model):
|
|
_inherit = 'account.edi.format'
|
|
|
|
"""
|
|
Once the journal has been successfully onboarded, we can clear/report invoices through the ZATCA API:
|
|
A) STANDARD Invoice:
|
|
Make a call to the Clearance API '/invoices/clearance/single'.
|
|
This will validate the invoice, sign it and apply a QR code then return the result.
|
|
B) SIMPLIFIED Invoice:
|
|
Make a call to the Reporting API '/invoices/reporting/single'.
|
|
This will validate the invoice then return the result.
|
|
The X509 Certificate and password from the PCSID API need to be provided in the request headers.
|
|
"""
|
|
|
|
# ====== Helper Functions =======
|
|
|
|
def _l10n_sa_get_zatca_datetime(self, timestamp):
|
|
return fields.Datetime.context_timestamp(self.with_context(tz='Asia/Riyadh'), timestamp)
|
|
|
|
def _l10n_sa_xml_node_content(self, root, xpath, namespaces=None):
|
|
namespaces = namespaces or self.env['account.edi.xml.ubl_21.zatca']._l10n_sa_get_namespaces()
|
|
return etree.tostring(root.xpath(xpath, namespaces=namespaces)[0], with_tail=False,
|
|
encoding='utf-8', method='xml')
|
|
|
|
# ====== Xades Signing =======
|
|
|
|
@api.model
|
|
def _l10n_sa_get_digital_signature(self, company_id, invoice_hash):
|
|
"""
|
|
Generate an ECDSA SHA256 digital signature for the XML eInvoice
|
|
"""
|
|
decoded_hash = b64decode(invoice_hash).decode()
|
|
return company_id.sudo().l10n_sa_private_key_id._sign(decoded_hash, formatting='base64')
|
|
|
|
def _l10n_sa_calculate_signed_properties_hash(self, issuer_name, serial_number, signing_time, public_key):
|
|
"""
|
|
Calculate the SHA256 value of the SignedProperties XML node. The algorithm used by ZATCA expects the indentation
|
|
of the nodes to start with 40 spaces, except for the root SignedProperties node.
|
|
"""
|
|
signed_properties = etree.fromstring(self.env['ir.qweb']._render('l10n_sa_edi.export_sa_zatca_ubl_signed_properties', {
|
|
'issuer_name': issuer_name,
|
|
'serial_number': serial_number,
|
|
'signing_time': signing_time,
|
|
'public_key_hashing': public_key,
|
|
}))
|
|
etree.indent(signed_properties, space=' ')
|
|
signed_properties_split = etree.tostring(signed_properties).decode().split('\n')
|
|
signed_properties_final = ""
|
|
for index, line in enumerate(signed_properties_split):
|
|
if index == 0:
|
|
signed_properties_final += line
|
|
else:
|
|
signed_properties_final += (' ' * 36) + line
|
|
if index != len(signed_properties_final) - 1:
|
|
signed_properties_final += '\n'
|
|
signed_properties_final = etree.tostring(etree.fromstring(signed_properties_final))
|
|
return b64encode(sha256(signed_properties_final).hexdigest().encode()).decode()
|
|
|
|
def _l10n_sa_sign_xml(self, xml_content, certificate, signature):
|
|
"""
|
|
Function that signs XML content of a UBL document with a provided B64 encoded X509 certificate
|
|
"""
|
|
root = etree.fromstring(xml_content)
|
|
etree.indent(root, space=' ')
|
|
|
|
def _set_content(xpath, content):
|
|
node = root.xpath(xpath)[0]
|
|
node.text = content
|
|
|
|
der_cert = certificate._get_der_certificate_bytes(formatting='base64')
|
|
|
|
issuer_name = certificate._l10n_sa_get_issuer_name()
|
|
serial_number = certificate.serial_number
|
|
signing_time = self._l10n_sa_get_zatca_datetime(datetime.now()).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
public_key_hashing = b64encode(sha256(der_cert).hexdigest().encode()).decode()
|
|
|
|
signed_properties_hash = self._l10n_sa_calculate_signed_properties_hash(issuer_name, serial_number,
|
|
signing_time, public_key_hashing)
|
|
|
|
_set_content("//*[local-name()='X509IssuerName']", issuer_name)
|
|
_set_content("//*[local-name()='X509SerialNumber']", serial_number)
|
|
_set_content("//*[local-name()='SignedSignatureProperties']/*[local-name()='SigningTime']", signing_time)
|
|
_set_content("//*[local-name()='SignedSignatureProperties']//*[local-name()='DigestValue']", public_key_hashing)
|
|
|
|
prehash_content = etree.tostring(root)
|
|
invoice_hash = self.env['account.edi.xml.ubl_21.zatca']._l10n_sa_generate_invoice_xml_hash(prehash_content,
|
|
'digest')
|
|
|
|
_set_content("//*[local-name()='SignatureValue']", signature)
|
|
_set_content("//*[local-name()='X509Certificate']", der_cert.decode())
|
|
_set_content("//*[local-name()='SignatureInformation']//*[local-name()='DigestValue']", invoice_hash)
|
|
_set_content("//*[@URI='#xadesSignedProperties']/*[local-name()='DigestValue']", signed_properties_hash)
|
|
|
|
return etree.tostring(root, with_tail=False)
|
|
|
|
def _l10n_sa_assert_clearance_status(self, invoice, clearance_data):
|
|
"""
|
|
Assert Clearance status. To be overridden in case there are any other cases to be accounted for
|
|
"""
|
|
mode = 'reporting' if invoice._l10n_sa_is_simplified() else 'clearance'
|
|
if mode == 'clearance' and clearance_data.get('clearanceStatus', '') != 'CLEARED':
|
|
return {'error': _("Invoice could not be cleared:\n%s", clearance_data), 'blocking_level': 'error'}
|
|
elif mode == 'reporting' and clearance_data.get('reportingStatus', '') != 'REPORTED':
|
|
return {'error': _("Invoice could not be reported:\n%s", clearance_data), 'blocking_level': 'error'}
|
|
return clearance_data
|
|
|
|
# ====== UBL Document Rendering & Submission =======
|
|
|
|
def _l10n_sa_postprocess_zatca_template(self, xml_content):
|
|
"""
|
|
Post-process xml content generated according to the ZATCA UBL specifications. Specifically, this entails:
|
|
- Force the xmlns:ext namespace on the root element (Invoice). This is required, since, by default
|
|
the generated UBL file does not have any ext namespaced element, so the namespace is removed
|
|
since it is unused.
|
|
"""
|
|
|
|
# Append UBLExtensions to the XML content
|
|
ubl_extensions = etree.fromstring(self.env['ir.qweb']._render('l10n_sa_edi.export_sa_zatca_ubl_extensions'))
|
|
root = etree.fromstring(xml_content)
|
|
root.insert(0, ubl_extensions)
|
|
|
|
# Force xmlns:ext namespace on UBl file
|
|
ns_map = {'ext': 'urn:oasis:names:specification:ubl:schema:xsd:CommonExtensionComponents-2'}
|
|
etree.cleanup_namespaces(root, top_nsmap=ns_map, keep_ns_prefixes=['ext'])
|
|
|
|
return etree.tostring(root, with_tail=False).decode()
|
|
|
|
def _l10n_sa_generate_zatca_template(self, invoice):
|
|
"""
|
|
Render the ZATCA UBL file
|
|
"""
|
|
xml_content, errors = self.env['account.edi.xml.ubl_21.zatca']._export_invoice(invoice)
|
|
if errors:
|
|
return {
|
|
'error': _("Could not generate Invoice UBL content: %s", ", \n".join(errors)),
|
|
'blocking_level': 'error'
|
|
}
|
|
return self._l10n_sa_postprocess_zatca_template(xml_content)
|
|
|
|
def _l10n_sa_submit_einvoice(self, invoice, signed_xml, PCSID_data):
|
|
"""
|
|
Submit a generated Invoice UBL file by making calls to the following APIs:
|
|
- A. Clearance API: Submit a standard Invoice to ZATCA for validation, returns signed UBL
|
|
- B. Reporting API: Submit a simplified Invoice to ZATCA for validation
|
|
"""
|
|
clearance_data = invoice.journal_id._l10n_sa_api_clearance(invoice, signed_xml.decode(), PCSID_data)
|
|
if clearance_data.get('json_errors'):
|
|
errors = [json.loads(j).get('validationResults', {}) for j in clearance_data['json_errors']]
|
|
error_msg = ''
|
|
is_warning = True
|
|
for error in errors:
|
|
validation_results = error.get('validationResults', {})
|
|
for err in validation_results.get('warningMessages', []):
|
|
error_msg += '\n - %s | %s' % (err['code'], err['message'])
|
|
for err in validation_results.get('errorMessages', []):
|
|
is_warning = False
|
|
error_msg += '\n - %s | %s' % (err['code'], err['message'])
|
|
return {
|
|
'error': error_msg,
|
|
'rejected': not is_warning,
|
|
'response': signed_xml.decode(),
|
|
'blocking_level': 'warning' if is_warning else 'error'
|
|
}
|
|
if not clearance_data.get('error'):
|
|
return self._l10n_sa_assert_clearance_status(invoice, clearance_data)
|
|
return clearance_data
|
|
|
|
def _l10n_sa_postprocess_einvoice_submission(self, invoice, signed_xml, clearance_data):
|
|
"""
|
|
Once an invoice has been successfully submitted, it is returned as a Cleared invoice, on which data
|
|
from ZATCA was applied. To be overridden to account for other cases, such as Reporting.
|
|
"""
|
|
if invoice._l10n_sa_is_simplified():
|
|
# if invoice is B2C, it is a SIMPLIFIED invoice, and thus it is only reported and returns
|
|
# no signed invoice. In this case, we just return the original content
|
|
return signed_xml.decode()
|
|
return b64decode(clearance_data['clearedInvoice']).decode()
|
|
|
|
def _l10n_sa_apply_qr_code(self, invoice, xml_content):
|
|
"""
|
|
Apply QR code on Invoice UBL content
|
|
"""
|
|
root = etree.fromstring(xml_content)
|
|
qr_code = invoice.l10n_sa_qr_code_str
|
|
qr_node = root.xpath('//*[local-name()="ID"][text()="QR"]/following-sibling::*/*')[0]
|
|
qr_node.text = qr_code
|
|
return etree.tostring(root, with_tail=False)
|
|
|
|
def _l10n_sa_get_signed_xml(self, invoice, unsigned_xml, certificate):
|
|
"""
|
|
Helper method to sign the provided XML, apply the QR code in the case if Simplified invoices (B2C), then
|
|
return the signed XML
|
|
"""
|
|
signed_xml = self._l10n_sa_sign_xml(unsigned_xml, certificate, invoice.l10n_sa_invoice_signature)
|
|
if invoice._l10n_sa_is_simplified():
|
|
# Applying with_prefetch() to set the _prefetch_ids = _ids,
|
|
# preventing premature QR code computation for other invoices.
|
|
invoice = invoice.with_prefetch()
|
|
return self._l10n_sa_apply_qr_code(invoice, signed_xml)
|
|
return signed_xml
|
|
|
|
def _l10n_sa_export_zatca_invoice(self, invoice, xml_content=None):
|
|
"""
|
|
Generate a ZATCA compliant UBL file, make API calls to authenticate, sign and include QR Code and
|
|
Cryptographic Stamp, then create an attachment with the final contents of the UBL file
|
|
"""
|
|
self.ensure_one()
|
|
|
|
# Prepare UBL invoice values and render XML file
|
|
unsigned_xml = xml_content or self._l10n_sa_generate_zatca_template(invoice)
|
|
|
|
# Load PCISD data and certificate
|
|
try:
|
|
PCSID_data, certificate = invoice.journal_id._l10n_sa_api_get_pcsid()
|
|
except UserError as e:
|
|
return ({
|
|
'error': _("Could not generate PCSID values:\n%(error)s", error=e.args[0]),
|
|
'blocking_level': 'error',
|
|
'response': unsigned_xml
|
|
}, unsigned_xml)
|
|
|
|
certificate_sudo = self.env['certificate.certificate'].sudo().browse(certificate)
|
|
|
|
# Apply Signature/QR code on the generated XML document
|
|
try:
|
|
signed_xml = self._l10n_sa_get_signed_xml(invoice, unsigned_xml, certificate_sudo)
|
|
except UserError as e:
|
|
return ({
|
|
'error': _("Could not generate signed XML values:\n%(error)s", error=e.args[0]),
|
|
'blocking_level': 'error',
|
|
'response': unsigned_xml
|
|
}, unsigned_xml)
|
|
|
|
# Once the XML content has been generated and signed, we submit it to ZATCA
|
|
return self._l10n_sa_submit_einvoice(invoice, signed_xml, PCSID_data), signed_xml
|
|
|
|
def _l10n_sa_check_partner_missing_info(self, partner_id, fields_to_check):
|
|
"""
|
|
Helper function to check if ZATCA mandated partner fields are missing for a specified partner record
|
|
"""
|
|
missing = []
|
|
for field in fields_to_check:
|
|
field_value = partner_id[field[0]]
|
|
if not field_value or (len(field) == 3 and not field[2](partner_id, field_value)):
|
|
missing.append(field[1])
|
|
return missing
|
|
|
|
def _l10n_sa_check_seller_missing_info(self, invoice):
|
|
"""
|
|
Helper function to check if ZATCA mandated partner fields are missing for the seller
|
|
"""
|
|
partner_id = invoice.company_id.partner_id.commercial_partner_id
|
|
fields_to_check = [
|
|
('l10n_sa_edi_building_number', _('Building Number for the Buyer is required on Standard Invoices')),
|
|
('street2', _('Neighborhood for the Seller is required on Standard Invoices')),
|
|
('l10n_sa_additional_identification_scheme',
|
|
_('Additional Identification Scheme is required for the Seller, and must be one of CRN, MOM, MLS, SAG or OTH'),
|
|
lambda p, v: v in ('CRN', 'MOM', 'MLS', 'SAG', 'OTH')
|
|
),
|
|
('vat',
|
|
_('VAT is required when Identification Scheme is set to Tax Identification Number'),
|
|
lambda p, v: p.l10n_sa_additional_identification_scheme != 'TIN'
|
|
),
|
|
('state_id', _('State / Country subdivision'))
|
|
]
|
|
return self._l10n_sa_check_partner_missing_info(partner_id, fields_to_check)
|
|
|
|
def _l10n_sa_check_buyer_missing_info(self, invoice):
|
|
"""
|
|
Helper function to check if ZATCA mandated partner fields are missing for the buyer
|
|
"""
|
|
fields_to_check = []
|
|
if any(tax.l10n_sa_exemption_reason_code in ('VATEX-SA-HEA', 'VATEX-SA-EDU') for tax in
|
|
invoice.invoice_line_ids.filtered(
|
|
lambda line: line.display_type == 'product').tax_ids):
|
|
fields_to_check += [
|
|
('l10n_sa_additional_identification_scheme',
|
|
_('Additional Identification Scheme is required for the Buyer if tax exemption reason is either '
|
|
'VATEX-SA-HEA or VATEX-SA-EDU, and its value must be NAT'), lambda p, v: v == 'NAT'),
|
|
('l10n_sa_additional_identification_number',
|
|
_('Additional Identification Number is required for commercial partners'),
|
|
lambda p, v: p.l10n_sa_additional_identification_scheme != 'TIN'
|
|
),
|
|
]
|
|
elif invoice.commercial_partner_id.l10n_sa_additional_identification_scheme == 'TIN':
|
|
fields_to_check += [
|
|
('vat', _('VAT is required when Identification Scheme is set to Tax Identification Number'))
|
|
]
|
|
if not invoice._l10n_sa_is_simplified() and invoice.partner_id.country_id.code == 'SA':
|
|
# If the invoice is a non-foreign, Standard (B2B), the Building Number and Neighborhood are required
|
|
fields_to_check += [
|
|
('l10n_sa_edi_building_number', _('Building Number for the Buyer is required on Standard Invoices')),
|
|
('street2', _('Neighborhood for the Buyer is required on Standard Invoices')),
|
|
]
|
|
return self._l10n_sa_check_partner_missing_info(invoice.commercial_partner_id, fields_to_check)
|
|
|
|
def _l10n_sa_post_zatca_edi(self, invoice): # no batch ensure that there is only one invoice
|
|
"""
|
|
Post invoice to ZATCA and return a dict of invoices and their success/attachment
|
|
"""
|
|
|
|
# Chain integrity check: chain head must have been REALLY posted, and did not time out
|
|
# When a submission times out, we reset the chain index of the invoice to False, so it has to be submitted again
|
|
# According to ZATCA, if we end up submitting the same invoice more than once, they will directly reach out
|
|
# to the taxpayer for clarifications
|
|
chain_head = invoice.journal_id._l10n_sa_get_last_posted_invoice()
|
|
if chain_head and chain_head != invoice and not chain_head._l10n_sa_is_in_chain():
|
|
return {invoice: {
|
|
'error': f"ZATCA: Cannot post invoice while chain head ({chain_head.name}) has not been posted",
|
|
'blocking_level': 'error',
|
|
'response': None,
|
|
}}
|
|
|
|
xml_content = None
|
|
if not invoice.l10n_sa_chain_index:
|
|
# If the Invoice doesn't have a chain index, it means it either has not been submitted before,
|
|
# or it was submitted and rejected. Either way, we need to assign it a new Chain Index and regenerate
|
|
# the data that depends on it before submitting (UUID, XML content, signature)
|
|
invoice.l10n_sa_chain_index = invoice.journal_id._l10n_sa_edi_get_next_chain_index()
|
|
xml_content = invoice._l10n_sa_generate_unsigned_data()
|
|
|
|
# Generate Invoice name for attachment
|
|
attachment_name = self.env['account.edi.xml.ubl_21.zatca']._export_invoice_filename(invoice)
|
|
|
|
# Generate XML, sign it, then submit it to ZATCA
|
|
response_data, submitted_xml = self._l10n_sa_export_zatca_invoice(invoice, xml_content)
|
|
|
|
# Check for submission errors
|
|
if response_data.get('error'):
|
|
|
|
# If the request was rejected, we save the signed xml content as an attachment
|
|
if response_data.get('rejected'):
|
|
invoice._l10n_sa_log_results(submitted_xml, response_data, error=True)
|
|
|
|
# If the request returned an exception (Timeout, ValueError... etc.) it means we're not sure if the
|
|
# invoice was successfully cleared/reported, and thus we keep the Index Chain.
|
|
# Else, we recalculate the submission Index (ICV), UUID, XML content and Signature
|
|
if not response_data.get('excepted'):
|
|
invoice.l10n_sa_chain_index = False
|
|
|
|
return {
|
|
invoice: {
|
|
**response_data,
|
|
'response': submitted_xml
|
|
}
|
|
}
|
|
|
|
# Once submission is done with no errors, check submission status
|
|
cleared_xml = self._l10n_sa_postprocess_einvoice_submission(invoice, submitted_xml, response_data)
|
|
|
|
# Save the submitted/returned invoice XML content once the submission has been completed successfully
|
|
invoice._l10n_sa_log_results(cleared_xml.encode(), response_data)
|
|
return {
|
|
invoice: {
|
|
'success': True,
|
|
'response': cleared_xml,
|
|
'message': '',
|
|
'attachment': self.env['ir.attachment'].create({
|
|
'name': attachment_name,
|
|
'raw': cleared_xml.encode(),
|
|
'res_model': 'account.move',
|
|
'res_id': invoice.id,
|
|
'mimetype': 'application/xml'
|
|
})
|
|
}
|
|
}
|
|
|
|
# ====== EDI Format Overrides =======
|
|
|
|
def _is_required_for_invoice(self, invoice):
|
|
"""
|
|
Override to add ZATCA edi checks on required invoices
|
|
"""
|
|
self.ensure_one()
|
|
if self.code != 'sa_zatca':
|
|
return super()._is_required_for_invoice(invoice)
|
|
|
|
return invoice.is_sale_document() and invoice.country_code == 'SA'
|
|
|
|
def _check_move_configuration(self, invoice):
|
|
"""
|
|
Override to add ZATCA compliance checks on the Invoice
|
|
"""
|
|
|
|
journal = invoice.journal_id
|
|
company = invoice.company_id
|
|
|
|
errors = super()._check_move_configuration(invoice)
|
|
if self.code != 'sa_zatca' or company.country_id.code != 'SA':
|
|
return errors
|
|
|
|
if invoice.commercial_partner_id == invoice.company_id.partner_id.commercial_partner_id:
|
|
errors.append(_("- You cannot post invoices where the Seller is the Buyer"))
|
|
|
|
if not all(line.tax_ids for line in invoice.invoice_line_ids.filtered(lambda line: line.display_type == 'product' and line._check_edi_line_tax_required())):
|
|
errors.append(_("- Invoice lines should have at least one Tax applied."))
|
|
|
|
if not journal._l10n_sa_ready_to_submit_einvoices():
|
|
errors.append(
|
|
_("- Finish the Onboarding procees for journal %s by requesting the CSIDs and completing the checks.", journal.name))
|
|
|
|
if not company._l10n_sa_check_organization_unit():
|
|
errors.append(
|
|
_("- The company VAT identification must contain 15 digits, with the first and last digits being '3' as per the BR-KSA-39 and BR-KSA-40 of ZATCA KSA business rule."))
|
|
if not company.sudo().l10n_sa_private_key_id:
|
|
errors.append(
|
|
_("- No Private Key was generated for company %s. A Private Key is mandatory in order to generate Certificate Signing Requests (CSR).", company.name))
|
|
if not journal.l10n_sa_serial_number:
|
|
errors.append(
|
|
_("- No Serial Number was assigned for journal %s. A Serial Number is mandatory in order to generate Certificate Signing Requests (CSR).", journal.name))
|
|
|
|
supplier_missing_info = self._l10n_sa_check_seller_missing_info(invoice)
|
|
customer_missing_info = self._l10n_sa_check_buyer_missing_info(invoice)
|
|
|
|
if supplier_missing_info:
|
|
errors.append(
|
|
_(
|
|
"- Please, set the following fields on the Supplier: %(missing_fields)s",
|
|
missing_fields=format_list(self.env, supplier_missing_info),
|
|
)
|
|
)
|
|
if customer_missing_info:
|
|
errors.append(
|
|
_(
|
|
"- Please, set the following fields on the Customer: %(missing_fields)s",
|
|
missing_fields=format_list(self.env, customer_missing_info),
|
|
)
|
|
)
|
|
if invoice.invoice_date > fields.Date.context_today(self.with_context(tz='Asia/Riyadh')):
|
|
errors.append(_("- Please, make sure the invoice date is set to either the same as or before Today."))
|
|
if invoice.move_type in ('in_refund', 'out_refund') and not invoice._l10n_sa_check_refund_reason():
|
|
errors.append(
|
|
_("- Please, make sure either the Reversed Entry or the Reversal Reason are specified when confirming a Credit/Debit note"))
|
|
return errors
|
|
|
|
def _needs_web_services(self):
|
|
"""
|
|
Override to add a check on edi document format code
|
|
"""
|
|
self.ensure_one()
|
|
return self.code == 'sa_zatca' or super()._needs_web_services()
|
|
|
|
def _is_compatible_with_journal(self, journal):
|
|
"""
|
|
Override to add a check on journal type & country code (SA)
|
|
"""
|
|
self.ensure_one()
|
|
if self.code != 'sa_zatca':
|
|
return super()._is_compatible_with_journal(journal)
|
|
return journal.type == 'sale' and journal.country_code == 'SA'
|
|
|
|
def _l10n_sa_get_invoice_content_edi(self, invoice):
|
|
"""
|
|
Return contents of the submitted UBL file or generate it if the invoice has not been submitted yet
|
|
"""
|
|
doc = invoice.edi_document_ids.filtered(lambda d: d.edi_format_id.code == 'sa_zatca' and d.state == 'sent')
|
|
return doc.attachment_id.raw or self._l10n_sa_generate_zatca_template(invoice).encode()
|
|
|
|
def _get_move_applicability(self, move):
|
|
# EXTENDS account_edi
|
|
self.ensure_one()
|
|
if self.code != 'sa_zatca' or move.country_code != 'SA' or move.move_type not in ('out_invoice', 'out_refund'):
|
|
return super()._get_move_applicability(move)
|
|
|
|
return {
|
|
'post': self._l10n_sa_post_zatca_edi,
|
|
'edi_content': self._l10n_sa_get_invoice_content_edi,
|
|
}
|