Odoo18-Base/addons/l10n_it_edi/models/account_move.py
2025-01-06 10:57:38 +07:00

1822 lines
89 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
from base64 import b64encode
from datetime import datetime
import logging
from lxml import etree
import uuid
from odoo import _, api, Command, fields, models, modules
from odoo.addons.base.models.ir_qweb_fields import Markup, nl2br, nl2br_enclose
from odoo.addons.account_edi_proxy_client.models.account_edi_proxy_user import AccountEdiProxyError
from odoo.exceptions import UserError
from odoo.tools import float_compare, float_repr, cleanup_xml_node, float_is_zero
_logger = logging.getLogger(__name__)
WAITING_STATES = ('being_sent', 'processing', 'forward_attempt')
# -------------------------------------------------------------------------
# XML tool functions
# -------------------------------------------------------------------------
def get_text(tree, xpath, many=False):
texts = [el.text.strip() for el in tree.xpath(xpath) if el.text]
return texts if many else texts[0] if texts else ''
def get_float(tree, xpath):
try:
return float(get_text(tree, xpath))
except ValueError:
return 0.0
def get_date(tree, xpath):
""" Dates in FatturaPA are ISO 8601 date format, pattern '[-]CCYY-MM-DD[Z|(+|-)hh:mm]' """
dt = get_datetime(tree, xpath)
return dt.date() if dt else False
def get_datetime(tree, xpath):
""" Datetimes in FatturaPA are ISO 8601 date format, pattern '[-]CCYY-MM-DDThh:mm:ss[Z|(+|-)hh:mm]'
Python 3.7 -> 3.11 doesn't support 'Z'.
"""
if datetime_str := get_text(tree, xpath):
try:
return datetime.fromisoformat(datetime_str.replace('Z', '+00:00'))
except (ValueError, TypeError):
return False
return False
class AccountMove(models.Model):
_inherit = 'account.move'
l10n_it_edi_state = fields.Selection(
string="SDI State",
selection=[
('being_sent', 'Being Sent To SdI'),
('requires_user_signature', 'Requires user signature'),
('processing', 'SdI Processing'),
('rejected', 'SdI Rejected'),
('forwarded', 'SdI Accepted, Forwarded to Partner'),
('forward_failed', 'SdI Accepted, Forward to Partner Failed'),
('forward_attempt', 'SdI Accepted, Forwarding to Partner'),
('accepted_by_pa_partner', 'SdI Accepted, Accepted by the PA Partner'),
('rejected_by_pa_partner', 'SdI Accepted, Rejected by the PA Partner'),
('accepted_by_pa_partner_after_expiry', 'SdI Accepted, PA Partner Expired Terms'),
],
copy=False, tracking=True,
help="This state is updated by default, but you can force the value. ",
)
l10n_it_edi_header = fields.Html(
help='User description of the current state, with hints to make the flow progress',
readonly=True,
copy=False,
)
l10n_it_edi_transaction = fields.Char(copy=False, string="FatturaPA Transaction")
l10n_it_edi_attachment_file = fields.Binary(copy=False, attachment=True)
l10n_it_edi_attachment_id = fields.Many2one(
comodel_name='ir.attachment',
string="FatturaPA Attachment",
compute=lambda self: self._compute_linked_attachment_id('l10n_it_edi_attachment_id', 'l10n_it_edi_attachment_file'),
depends=['l10n_it_edi_attachment_file'],
)
l10n_it_edi_is_self_invoice = fields.Boolean(compute="_compute_l10n_it_edi_is_self_invoice")
l10n_it_stamp_duty = fields.Float(string="Dati Bollo")
l10n_it_ddt_id = fields.Many2one('l10n_it.ddt', string='DDT', copy=False)
l10n_it_origin_document_type = fields.Selection(
string="Origin Document Type",
selection=[('purchase_order', 'Purchase Order'), ('contract', 'Contract'), ('agreement', 'Agreement')],
copy=False)
l10n_it_origin_document_name = fields.Char(
string="Origin Document Name",
copy=False)
l10n_it_origin_document_date = fields.Date(
string="Origin Document Date",
copy=False)
l10n_it_cig = fields.Char(
string="CIG",
copy=False,
help="Tender Unique Identifier")
l10n_it_cup = fields.Char(
string="CUP",
copy=False,
help="Public Investment Unique Identifier")
# Technical field for showing the above fields or not
l10n_it_partner_pa = fields.Boolean(compute='_compute_l10n_it_partner_pa')
# -------------------------------------------------------------------------
# Computes
# -------------------------------------------------------------------------
@api.depends('commercial_partner_id.l10n_it_pa_index', 'company_id')
def _compute_l10n_it_partner_pa(self):
for move in self:
partner = move.commercial_partner_id
move.l10n_it_partner_pa = partner and (partner._l10n_it_edi_is_public_administration() or len(partner.l10n_it_pa_index or '') == 7)
@api.depends('move_type', 'line_ids.tax_tag_ids')
def _compute_l10n_it_edi_is_self_invoice(self):
"""
Italian EDI requires Vendor bills coming from EU countries to be sent as self-invoices.
We recognize these cases based on the taxes that target the VJ tax grids, which imply
the use of VAT External Reverse Charge.
"""
purchases = self.filtered(lambda m: m.is_purchase_document())
others = self - purchases
for move in others:
move.l10n_it_edi_is_self_invoice = False
if purchases:
it_tax_report_vj_lines = self.env['account.report.line'].sudo().search([
('report_id.country_id.code', '=', 'IT'),
('code', '=like', 'VJ%')
])
vj_lines_tags = it_tax_report_vj_lines.expression_ids._get_matching_tags()
for move in purchases:
invoice_lines_tags = move.line_ids.tax_tag_ids
ids_intersection = set(invoice_lines_tags.ids) & set(vj_lines_tags.ids)
move.l10n_it_edi_is_self_invoice = bool(ids_intersection)
def _l10n_it_edi_exempt_reason_tag_mapping(self):
return {
"N3.2": "VJ3",
"N3.3": "VJ1",
"N6.1": "VJ6",
"N6.2": "VJ7",
"N6.3": "VJ12",
"N6.4": "VJ13",
"N6.5": "VJ14",
"N6.6": "VJ15",
"N6.7": "VJ16",
"N6.8": "VJ17",
}
# -------------------------------------------------------------------------
# Overrides
# -------------------------------------------------------------------------
@api.depends('l10n_it_edi_transaction')
def _compute_show_reset_to_draft_button(self):
# EXTENDS 'account'
super()._compute_show_reset_to_draft_button()
for move in self:
move.show_reset_to_draft_button = not move.l10n_it_edi_transaction and move.show_reset_to_draft_button
def _get_edi_decoder(self, file_data, new=False):
# EXTENDS 'account'
if file_data['type'] == 'l10n_it_edi':
return self._l10n_it_edi_import_invoice
return super()._get_edi_decoder(file_data, new=new)
def _post(self, soft=True):
# EXTENDS 'account'
self.with_context(skip_is_manually_modified=True).write({'l10n_it_edi_header': False})
return super()._post(soft)
def _extend_with_attachments(self, attachments, new=False):
result = False
# Prediction is an enterprise feature.
if self._is_prediction_enabled():
# Italy needs a custom order in prediction, since prediction generally deduces taxes
# from products, while in Italian EDI, taxes are generally explicited in the XML file
# while the product may not be labelled exactly the same as in the database
l10n_it_attachments = attachments.filtered(lambda rec: rec._is_l10n_it_edi_import_file())
if l10n_it_attachments:
attachments = attachments - l10n_it_attachments
result = super(AccountMove, self.with_context(disable_onchange_name_predictive=True))._extend_with_attachments(l10n_it_attachments, new)
return result or super()._extend_with_attachments(attachments, new)
# -------------------------------------------------------------------------
# Business actions
# -------------------------------------------------------------------------
def action_l10n_it_edi_send(self):
""" Checks that the invoice data is coherent.
Attaches the XML file to the invoice.
Sends the invoice to the SdI.
"""
self.ensure_one()
if errors := self._l10n_it_edi_export_data_check():
messages = []
for error_key, error_data in errors.items():
message = error_data['message']
split = error_key.split("_")
if len(split) > 3 and (model_id := {
'partner': 'res.partner',
'move': 'account.move',
'company': 'res.company'
}.get(split[3], None)):
if action := error_data.get('action'):
if 'res_id' in action:
record_ids = [action['res_id']]
else:
record_ids = action['domain'][0][2]
records = self.env[model_id].browse(record_ids)
message = f"{message} - {', '.join(records.mapped('display_name'))}"
messages.append(nl2br(message))
# Update the vendor bill's header with the warning messages,
# and force reload the view to make sure the header is loaded
self.l10n_it_edi_header = Markup('<br/>').join(messages)
return {
'type': 'ir.actions.client',
'tag': 'reload',
}
attachment_vals = self._l10n_it_edi_get_attachment_values(pdf_values=None)
self.env['ir.attachment'].create(attachment_vals)
self.invalidate_recordset(fnames=['l10n_it_edi_attachment_id', 'l10n_it_edi_attachment_file'])
self.message_post(attachment_ids=self.l10n_it_edi_attachment_id.ids)
self._l10n_it_edi_send({self: attachment_vals})
self.is_move_sent = True
def action_check_l10n_it_edi(self):
self.ensure_one()
if not self.l10n_it_edi_transaction and self.l10n_it_edi_state not in WAITING_STATES:
raise UserError(_("This move is not waiting for updates from the SdI."))
if self.l10n_it_edi_state == 'being_sent':
return {'type': 'ir.actions.client', 'tag': 'reload'}
self._l10n_it_edi_update_send_state()
def button_draft(self):
# EXTENDS 'account'
for move in self:
move.l10n_it_edi_state = False
return super().button_draft()
# -------------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------------
def _l10n_it_edi_ready_for_xml_export(self):
self.ensure_one()
return (
self.state == 'posted'
and self.company_id.account_fiscal_country_id.code == 'IT'
and self.journal_id.type == 'sale'
and self.l10n_it_edi_state in (False, 'rejected')
)
def _l10n_it_edi_add_base_lines_xml_values(self, base_lines_aggregated_values, is_downpayment):
self.ensure_one()
quantita_pd = min(self.env['account.move.line']._fields['quantity'].get_digits(self.env)[1], 8)
for index, (base_line, aggregated_values) in enumerate(base_lines_aggregated_values, start=1):
line = base_line['record']
tax_details = base_line['tax_details']
discount = base_line['discount']
quantity = base_line['quantity']
price_subtotal = base_line['price_subtotal'] = tax_details['raw_total_excluded_currency']
it_values = base_line['it_values'] = {}
# Description.
# Down payment lines:
# If there was a down paid amount that has been deducted from this move,
# we need to put a reference to the down payment invoice in the DatiFattureCollegate tag
description = line.name
if not is_downpayment and price_subtotal < 0:
downpayment_moves = line._get_downpayment_lines().move_id
if downpayment_moves:
downpayment_moves_description = ', '.join(downpayment_moves.mapped('name'))
sep = ', ' if description else ''
description = f"{description}{sep}{downpayment_moves_description}"
description = description or "NO NAME"
# Price unit.
if quantity:
it_values['prezzo_unitario'] = base_line['gross_price_subtotal'] / quantity
else:
it_values['prezzo_unitario'] = 0.0
# Discount.
discount_list = it_values['sconto_maggiorazione_list'] = []
delta_discount = base_line.get('discount_amount', 0.0) - base_line.get('discount_amount_before_dispatching', 0.0)
if discount:
discount_list.append({
'tipo': 'SC' if discount > 0 else 'MG',
'percentuale': abs(discount),
'importo': None,
})
if not base_line['currency_id'].is_zero(delta_discount):
discount_list.append({
'tipo': 'SC',
'percentuale': None,
'importo': abs(delta_discount / (quantity or 1.0)),
})
# Tax rates.
rates = it_values['aliquota_iva_list'] = []
for values in aggregated_values.values():
grouping_key = values['grouping_key']
if not grouping_key or grouping_key['skip']:
continue
rates.append(grouping_key['tax_amount_field'] if grouping_key['tax_amount_type_field'] == 'percent' else 0.0)
# Tax exempt reason.
vat_tax = base_line['tax_ids'].flatten_taxes_hierarchy().filtered(lambda t: t._l10n_it_filter_kind('vat') and t.amount >= 0)[:1]
it_values['natura'] = vat_tax.l10n_it_exempt_reason or None
# Other data.
other_data_list = it_values['altri_dati_gestionali_list'] = []
if base_line['currency_id'] != self.company_currency_id:
other_data_list.extend([
{
'tipo_dato': 'DIVISA',
'riferimento_testo': base_line['currency_id'].name,
'riferimento_numero': tax_details['raw_total_excluded_currency'],
'riferimento_data': None,
},
{
'tipo_dato': 'CAMBIO',
'riferimento_testo': None,
'riferimento_numero': base_line['rate'],
'riferimento_data': self.invoice_date,
},
])
it_values.update({
'numero_linea': index,
'descrizione': description,
'prezzo_totale': tax_details['raw_total_excluded'],
'quantita': quantity,
'quantita_pd': quantita_pd,
'ritenuta': None,
})
def _l10n_it_edi_get_tax_lines_xml_values(self, base_lines_aggregated_values, values_per_grouping_key):
self.ensure_one()
tax_lines = []
for values in values_per_grouping_key.values():
grouping_key = values['grouping_key']
if not grouping_key or grouping_key['skip']:
continue
rounding = values['base_amount']
for _base_line, aggregated_values in base_lines_aggregated_values:
if grouping_key in aggregated_values:
rounding -= aggregated_values[grouping_key]['raw_base_amount']
if float_is_zero(rounding, precision_digits=8):
rounding = None
tax_lines.append({
'aliquota_iva': grouping_key['tax_amount_field'],
'natura': grouping_key['l10n_it_exempt_reason'],
'arrotondamento': rounding,
'imponibile_importo': values['base_amount'],
'imposta': values['tax_amount'],
'esigibilita_iva': grouping_key['tax_exigibility_code'],
'riferimento_normativo': grouping_key['l10n_it_law_reference'],
})
return tax_lines
@api.model
def _l10n_it_edi_is_neg_split_payment(self, tax_data):
tax = tax_data['tax']
return (
tax.amount < 0.0
and tax_data['group']
and any(child_tax._l10n_it_is_split_payment() for child_tax in tax_data['group'].children_tax_ids)
)
@api.model
def _l10n_it_edi_grouping_function_base_lines(self, base_line, tax_data):
tax = tax_data['tax']
return {
'tax_amount_field': -23.0 if tax.amount == -11.5 else tax.amount,
'tax_amount_type_field': tax.amount_type,
'skip': tax_data['is_reverse_charge'] or self._l10n_it_edi_is_neg_split_payment(tax_data),
}
@api.model
def _l10n_it_edi_grouping_function_tax_lines(self, base_line, tax_data):
tax = tax_data['tax']
if tax._l10n_it_is_split_payment():
tax_exigibility_code = 'S'
elif tax.tax_exigibility == 'on_payment':
tax_exigibility_code = 'D'
elif tax.tax_exigibility == 'on_invoice':
tax_exigibility_code = 'I'
else:
tax_exigibility_code = None
return {
'tax_amount_field': -23.0 if tax.amount == -11.5 else tax.amount,
'l10n_it_exempt_reason': tax.l10n_it_exempt_reason,
'l10n_it_law_reference': tax.l10n_it_law_reference,
'tax_exigibility_code': tax_exigibility_code,
'tax_amount_type_field': tax.amount_type,
'skip': tax_data['is_reverse_charge'] or self._l10n_it_edi_is_neg_split_payment(tax_data),
}
@api.model
def _l10n_it_edi_grouping_function_total(self, base_line, tax_data):
skip = tax_data['is_reverse_charge'] or self._l10n_it_edi_is_neg_split_payment(tax_data)
return not skip
def _l10n_it_edi_get_values(self, pdf_values=None):
self.ensure_one()
# Flags
is_self_invoice = self.l10n_it_edi_is_self_invoice
document_type = self._l10n_it_edi_get_document_type()
# Represent if the document is a reverse charge refund in a single variable
reverse_charge = document_type in ['TD16', 'TD17', 'TD18', 'TD19']
is_downpayment = document_type in ['TD02']
reverse_charge_refund = self.move_type == 'in_refund' and reverse_charge
convert_to_euros = self.currency_id.name != 'EUR'
# Base lines.
base_amls = self.line_ids.filtered(lambda x: x.display_type == 'product')
base_lines = [self._prepare_product_base_line_for_taxes_computation(x) for x in base_amls]
tax_amls = self.line_ids.filtered(lambda x: x.display_type == 'tax')
tax_lines = [self._prepare_tax_line_for_taxes_computation(x) for x in tax_amls]
if reverse_charge_refund:
for base_line in base_lines:
base_line['price_unit'] *= -1
AccountTax = self.env['account.tax']
AccountTax._add_tax_details_in_base_lines(base_lines, self.company_id)
downpayment_lines = []
# Prepare for '_dispatch_negative_lines'
for base_line in base_lines:
tax_details = base_line['tax_details']
discount = base_line['discount']
price_unit = base_line['price_unit']
quantity = base_line['quantity']
price_subtotal = base_line['price_subtotal'] = tax_details['raw_total_excluded_currency']
if discount == 100.0:
gross_price_subtotal_before_discount = price_unit * quantity
else:
gross_price_subtotal_before_discount = price_subtotal / (1 - discount / 100.0)
base_line['gross_price_subtotal'] = gross_price_subtotal_before_discount
base_line['discount_amount_before_dispatching'] = gross_price_subtotal_before_discount - price_subtotal
# The tax "23% Ritenuta Agenti e Rappresentanti" is not supported because it's supposed to be a tax of 23% based on
# 50% of the base amount. It's currently implemented as a -11.5% tax. So on 1000, it gives an amount of -115.
# We need to fix the base amount from 1000 to 500.0.
for tax_data in tax_details['taxes_data']:
tax = tax_data['tax']
tax_data['_tax_amount'] = tax.amount
if tax.amount == -11.5:
tax_data['_tax_amount'] = -23.0
tax_data['raw_base_amount'] *= 0.5
tax_data['raw_base_amount_currency'] *= 0.5
if not is_downpayment:
# Negative lines linked to down payment should stay negative
line = base_line['record']
if line.price_subtotal < 0 and line._get_downpayment_lines():
downpayment_lines.append(base_line)
base_lines.remove(base_line)
if float_compare(quantity, 0, 2) < 0:
# Negative quantity is refused by SDI, so we invert quantity and price_unit to keep the price_subtotal
base_line.update({
'quantity': -quantity,
'price_unit': -price_unit,
})
dispatched_results = self.env['account.tax']._dispatch_negative_lines(base_lines)
base_lines = dispatched_results['result_lines'] + dispatched_results['orphan_negative_lines'] + downpayment_lines
AccountTax._round_base_lines_tax_details(base_lines, self.company_id, tax_lines=tax_lines)
base_lines_aggregated_values = AccountTax._aggregate_base_lines_tax_details(base_lines, self._l10n_it_edi_grouping_function_base_lines)
self._l10n_it_edi_add_base_lines_xml_values(base_lines_aggregated_values, is_downpayment)
base_lines = sorted(base_lines, key=lambda base_line: base_line['it_values']['numero_linea'])
# Tax lines.
base_lines_aggregated_values = AccountTax._aggregate_base_lines_tax_details(base_lines, self._l10n_it_edi_grouping_function_tax_lines)
values_per_grouping_key = AccountTax._aggregate_base_lines_aggregated_values(base_lines_aggregated_values)
tax_lines = self._l10n_it_edi_get_tax_lines_xml_values(base_lines_aggregated_values, values_per_grouping_key)
# Total of the document.
base_lines_aggregated_values = AccountTax._aggregate_base_lines_tax_details(base_lines, self._l10n_it_edi_grouping_function_total)
values_per_grouping_key = AccountTax._aggregate_base_lines_aggregated_values(base_lines_aggregated_values)
importo_totale_documento = 0.0
for values in values_per_grouping_key.values():
grouping_key = values['grouping_key']
if grouping_key is False:
continue
importo_totale_documento += values['base_amount_currency']
importo_totale_documento += values['tax_amount_currency']
company = self.company_id
partner = self.commercial_partner_id
sender = company
buyer = partner if not is_self_invoice else company
seller = company if not is_self_invoice else partner
sender_info_values = company.partner_id._l10n_it_edi_get_values()
buyer_info_values = (partner if not is_self_invoice else company.partner_id)._l10n_it_edi_get_values()
seller_info_values = (company.partner_id if not is_self_invoice else partner)._l10n_it_edi_get_values()
representative_info_values = company.l10n_it_tax_representative_partner_id._l10n_it_edi_get_values()
if self._l10n_it_edi_is_simplified_document_type(document_type):
formato_trasmissione = "FSM10"
elif partner._l10n_it_edi_is_public_administration():
formato_trasmissione = "FPA12"
else:
formato_trasmissione = "FPR12"
# Reference line for finding the conversion rate used in the document
conversion_rate = float_repr(
abs(self.amount_total / self.amount_total_signed), precision_digits=5,
) if convert_to_euros and self.invoice_line_ids else None
# Reduce downpayment views to a single recordset
downpayment_moves = self.invoice_line_ids._get_downpayment_lines().move_id
return {
'record': self,
'base_lines': base_lines,
'tax_lines': tax_lines,
'importo_totale_documento': importo_totale_documento,
'company': company,
'partner': partner,
'sender': sender,
'buyer': buyer,
'seller': seller,
'representative': company.l10n_it_tax_representative_partner_id,
'sender_info': sender_info_values,
'buyer_info': buyer_info_values,
'seller_info': seller_info_values,
'representative_info': representative_info_values,
'origin_document_type': self.l10n_it_origin_document_type,
'origin_document_name': self.l10n_it_origin_document_name,
'origin_document_date': self.l10n_it_origin_document_date,
'cig': self.l10n_it_cig,
'cup': self.l10n_it_cup,
'currency': self.currency_id or self.company_currency_id if not convert_to_euros else self.env.ref('base.EUR'),
'regime_fiscale': company.l10n_it_tax_system if not is_self_invoice else 'RF18',
'is_self_invoice': is_self_invoice,
'partner_bank': self.partner_bank_id,
'formato_trasmissione': formato_trasmissione,
'document_type': document_type,
'payment_method': 'MP05',
'downpayment_moves': downpayment_moves,
'reconciled_moves': self._get_reconciled_invoices(),
'rc_refund': reverse_charge_refund,
'conversion_rate': conversion_rate,
'balance_multiplicator': -1 if self.is_inbound() else 1,
'abs': abs,
'pdf_name': pdf_values['name'] if pdf_values else False,
'pdf': b64encode(pdf_values['raw']).decode() if pdf_values else False,
}
def _l10n_it_edi_services_or_goods(self):
"""
Services and goods have different tax grids when VAT is Reverse Charged, and they can't
be mixed in the same invoice, because the TipoDocumento depends on which which kind
of product is bought and it's unambiguous.
"""
self.ensure_one()
scopes = []
for line in self.invoice_line_ids.filtered(lambda l: l.display_type not in ('line_note', 'line_section')):
tax_ids_with_tax_scope = line.tax_ids.filtered(lambda x: x.tax_scope)
if tax_ids_with_tax_scope:
scopes += tax_ids_with_tax_scope.mapped('tax_scope')
else:
scopes.append(line.product_id and line.product_id.type == 'service' and 'service' or 'consu')
if set(scopes) == {'consu', 'service'}:
return "both"
return scopes and scopes.pop()
def _l10n_it_edi_goods_in_italy(self):
"""
There is a specific TipoDocumento (Document Type TD19) and tax grid (VJ3) for goods
that are phisically in Italy but are in a VAT deposit, meaning that the goods
have not passed customs.
"""
self.ensure_one()
invoice_lines_tags = self.line_ids.tax_tag_ids
it_tax_report_vj3_lines = self.env['account.report.line'].search([
('report_id.country_id.code', '=', 'IT'),
('code', '=', 'VJ3'),
])
vj3_lines_tags = it_tax_report_vj3_lines.expression_ids._get_matching_tags()
return bool(invoice_lines_tags & vj3_lines_tags)
def _l10n_it_edi_is_simplified(self):
"""
Simplified Invoices are a way for the invoice issuer to create an invoice with limited data.
Example: a consultant goes to the restaurant and wants the invoice instead of the receipt,
to be able to deduct the expense from his Taxes. The Italian State allows the restaurant
to issue a Simplified Invoice with the VAT number only, to speed up times, instead of
requiring the address and other informations about the buyer.
Only invoices under the threshold of 400 Euroes are allowed, to avoid this tool
be abused for bigger transactions, that would enable less transparency to tax institutions.
"""
self.ensure_one()
template_reference = self.env.ref('l10n_it_edi.account_invoice_it_simplified_FatturaPA_export', raise_if_not_found=False)
buyer = self.commercial_partner_id
checks = ['partner_address_missing', 'partner_vat_codice_fiscale_missing']
return bool(
template_reference
and not self.l10n_it_edi_is_self_invoice
and list(buyer._l10n_it_edi_export_check(checks).keys()) == ['l10n_it_edi_partner_address_missing']
and (not buyer.country_id or buyer.country_id.code == 'IT')
and (buyer.l10n_it_codice_fiscale or (buyer.vat and (buyer.vat[:2].upper() == 'IT' or buyer.vat[:2].isdecimal())))
and self.amount_total <= 400
)
def _l10n_it_edi_is_professional_fees(self):
"""
This function returns a boolean value based on the comparison of the lines values with a product.
If one line has the tag for professional fee then we return True
"""
self.ensure_one()
professional_fee_tag = self.env.ref('l10n_it_edi.l10n_it_edi_professional_fees_tag', raise_if_not_found=False)
if not professional_fee_tag:
return False
return any(
professional_fee_tag.id in line.account_id.tag_ids.ids
for line in self.invoice_line_ids
if line.display_type not in ('line_note', 'line_section')
)
def _l10n_it_edi_features_for_document_type_selection(self):
""" Returns a dictionary of features to be compared with the TDxx FatturaPA
document type requirements. """
partner_values = self.commercial_partner_id._l10n_it_edi_get_values()
services_or_goods = self._l10n_it_edi_services_or_goods()
return {
'move_types': self.move_type,
'partner_in_eu': partner_values.get('in_eu', False),
'partner_country_code': partner_values.get('country_code', False),
'simplified': self._l10n_it_edi_is_simplified(),
'self_invoice': self.l10n_it_edi_is_self_invoice,
'tax_tags': {tag for tag in self.line_ids.tax_tag_ids.mapped(lambda x: (x.name or '').upper().replace("+", "").replace("-", "")) if tag},
'downpayment': self._is_downpayment(),
'services_or_goods': services_or_goods,
'goods_in_italy': services_or_goods == 'consu' and self._l10n_it_edi_goods_in_italy(),
'professional_fees': self._l10n_it_edi_is_professional_fees(),
}
def _l10n_it_edi_document_type_mapping(self):
""" Returns a dictionary with the required features for every TDxx FatturaPA document type """
return {
'TD01': {'move_types': ['out_invoice'],
'import_type': 'in_invoice',
'self_invoice': False,
'simplified': False,
'downpayment': False,
'professional_fees': False},
'TD02': {'move_types': ['out_invoice'],
'import_type': 'in_invoice',
'self_invoice': False,
'simplified': False,
'downpayment': True,
'professional_fees': False},
'TD03': {'move_types': ['out_invoice'],
'import_type': 'in_invoice',
'self_invoice': False,
'simplified': False,
'downpayment': True,
'professional_fees': True},
'TD04': {'move_types': ['out_refund'],
'import_type': 'in_refund',
'self_invoice': False,
'simplified': False},
'TD05': {'move_types': ['out_refund'],
'import_type': 'in_refund',
'self_invoice': False,
'simplified': False},
'TD06': {'move_types': ['out_invoice'],
'import_type': 'in_invoice',
'self_invoice': False,
'simplified': False,
'downpayment': False,
'professional_fees': True},
'TD07': {'move_types': ['out_invoice'],
'import_type': 'in_invoice',
'self_invoice': False,
'simplified': True},
'TD08': {'move_types': ['out_refund'],
'import_type': 'in_refund',
'self_invoice': False,
'simplified': True},
'TD09': {'move_types': ['out_invoice'],
'import_type': 'in_invoice',
'self_invoice': False,
'simplified': True},
'TD28': {'move_types': ['in_invoice', 'in_refund'],
'import_type': 'in_invoice',
'simplified': False,
'self_invoice': True,
'partner_country_code': "SM"},
'TD16': {'move_types': ['in_invoice', 'in_refund'],
'import_type': 'in_invoice',
'simplified': False,
'self_invoice': True,
'tax_tags': {'VJ6', 'VJ7', 'VJ8', 'VJ12', 'VJ13', 'VJ14', 'VJ15', 'VJ16', 'VJ17'}},
'TD17': {'move_types': ['in_invoice', 'in_refund'],
'import_type': 'in_invoice',
'simplified': False,
'self_invoice': True,
'services_or_goods': "service",
'tax_tags': {'VJ3'}},
'TD18': {'move_types': ['in_invoice', 'in_refund'],
'import_type': 'in_invoice',
'simplified': False,
'self_invoice': True,
'services_or_goods': "consu",
'goods_in_italy': False,
'partner_in_eu': True,
'tax_tags': {'VJ9'}},
'TD19': {'move_types': ['in_invoice', 'in_refund'],
'import_type': 'in_invoice',
'simplified': False,
'self_invoice': True,
'services_or_goods': "consu",
'goods_in_italy': True,
'tax_tags': {'VJ3'}},
}
def _l10n_it_edi_get_document_type(self):
""" Compare the features of the invoice to the requirements of each Document Type (TDxx)
FatturaPA until you find a valid one. """
def compare(actual_values, expected_values):
""" Compare a single entry from the invoice features with the one of the document_type """
if isinstance(expected_values, set | list | tuple):
# i.e. When we compare actual tax_tags from the invoice with expected tags, we see if there is at least one in common
if isinstance(actual_values, set):
return actual_values & set(expected_values)
# i.e. When we compare the move_type with the available ones, these can be more than one
return actual_values in expected_values
# We compare other features directly, one on one
return actual_values == expected_values
invoice_features = self._l10n_it_edi_features_for_document_type_selection()
for document_type_code, document_type_features in self._l10n_it_edi_document_type_mapping().items():
# By using a generator instead of a list, we can avoid some comparisons
if all(compare(invoice_values, document_type_features[k]) for k, invoice_values in invoice_features.items() if k in document_type_features):
return document_type_code
return False
def _l10n_it_edi_is_simplified_document_type(self, document_type):
mapping = self._l10n_it_edi_document_type_mapping()
return mapping.get(document_type, {}).get('simplified', False)
@api.model
def _l10n_it_buyer_seller_info(self):
return {
'buyer': {
'role': 'buyer',
'section_xpath': '//CessionarioCommittente',
'vat_xpath': '//CessionarioCommittente//IdCodice',
'codice_fiscale_xpath': '//CessionarioCommittente//CodiceFiscale',
'type_tax_use_domain': [('type_tax_use', '=', 'purchase')],
},
'seller': {
'role': 'seller',
'section_xpath': '//CedentePrestatore',
'vat_xpath': '//CedentePrestatore//IdCodice',
'codice_fiscale_xpath': '//CedentePrestatore//CodiceFiscale',
'type_tax_use_domain': [('type_tax_use', '=', 'sale')],
},
}
# -------------------------------------------------------------------------
# EDI: Import
# -------------------------------------------------------------------------
def cron_l10n_it_edi_download_and_update(self):
""" Crons run with sudo(), with empty recordset. Remember that. """
retrigger = False
for proxy_user in self.env['account_edi_proxy_client.user'].search([('proxy_type', '=', 'l10n_it_edi')]):
proxy_user = proxy_user.with_company(proxy_user.company_id)
if proxy_user.edi_mode != 'demo':
moves_to_check = self.search([
('company_id', '=', proxy_user.company_id.id),
('l10n_it_edi_transaction', '!=', False),
('l10n_it_edi_state', 'in', WAITING_STATES)
])
if moves_to_check:
moves_to_check._l10n_it_edi_update_send_state()
retrigger = retrigger or self._l10n_it_edi_download_invoices(proxy_user)
# Retrigger download if there are still some on the server
if retrigger:
_logger.info('Retriggering "Receive invoices from the SdI"...')
self.env.ref('l10n_it_edi.ir_cron_l10n_it_edi_download_and_update')._trigger()
def _l10n_it_edi_download_invoices(self, proxy_user):
""" Check the proxy for incoming invoices for a specified proxy user.
:return: True if there remain some invoices on the server to be downloaded, False otherwise.
"""
server_url = proxy_user._get_server_url()
# Download invoices
invoices_data = {}
try:
invoices_data = proxy_user._make_request(f'{server_url}/api/l10n_it_edi/1/in/RicezioneInvoice',
params={'recipient_codice_fiscale': proxy_user.company_id.l10n_it_codice_fiscale})
except AccountEdiProxyError as e:
_logger.error('Error while receiving invoices from the SdI: %s', e)
return False
# Process the downloaded invoices
processed = self._l10n_it_edi_process_downloads(invoices_data, proxy_user)
if processed['proxy_acks']:
try:
proxy_user._make_request(
f'{server_url}/api/l10n_it_edi/1/ack',
params={'transaction_ids': processed['proxy_acks']})
except AccountEdiProxyError as e:
_logger.error('Error while receiving file from the SdI: %s', e)
return processed['retrigger']
def _l10n_it_edi_process_downloads(self, invoices_data, proxy_user):
""" Every attachment will be committed if stored succesfully.
Also moves will be committed one by one, even if imported incorrectly.
"""
proxy_acks = []
retrigger = False
moves = self.env['account.move']
for id_transaction, invoice_data in invoices_data.items():
# The IAP server has a maximum number of documents it can send.
# If that maximum is reached, then we search for more
# by re-triggering the download cron, avoiding the timeout.
current_num = invoice_data.get('current_num', 0)
max_num = invoice_data.get('max_num', 0)
retrigger = retrigger or current_num == max_num > 0
# `_l10n_it_edi_create_move_from_attachment` will create an empty move
# then try and fill it with the content imported from the attachment.
# Should the import fail, thanks to try..except and savepoint,
# we will anyway end up with an empty `in_invoice` with the attachment posted on it.
if move := self.with_company(proxy_user.company_id)._l10n_it_edi_create_move_with_attachment(
invoice_data['filename'],
invoice_data['file'],
invoice_data['key'],
proxy_user,
):
if not modules.module.current_test:
self.env.cr.commit()
moves |= move
proxy_acks.append(id_transaction)
# Extend created moves with the related attachments and commit
for move in moves:
move._extend_with_attachments(move.l10n_it_edi_attachment_id, new=True)
if not modules.module.current_test:
self.env.cr.commit()
return {"retrigger": retrigger, "proxy_acks": proxy_acks}
def _l10n_it_edi_create_move_with_attachment(self, filename, content, key, proxy_user):
""" Creates a move and save an incoming file from the SdI as its attachment.
:param filename: name of the file to be saved.
:param content: encrypted content of the file to be saved.
:param key: key to decrypt the file.
:param proxy_user: the AccountEdiProxyClientUser to use for decrypting the file
"""
# Name should be unique per company, the invoice already exists
Attachment = self.env['ir.attachment'].sudo().with_company(proxy_user.company_id)
if Attachment.search_count([
('name', '=', filename),
('res_model', '=', 'account.move'),
('res_field', '=', 'l10n_it_edi_attachment_file'),
('company_id', '=', proxy_user.company_id.id),
], limit=1):
_logger.warning('E-invoice already exists: %s', filename)
return False
# Decrypt with the server key
try:
decrypted_content = proxy_user._decrypt_data(content, key)
except Exception as e: # noqa: BLE001
_logger.warning("Cannot decrypt e-invoice: %s, %s", filename, e)
return False
# Create the attachment, an empty move, then attach the two and commit
move = self.with_company(proxy_user.company_id).create({})
attachment = Attachment.create({
'name': filename,
'raw': decrypted_content,
'type': 'binary',
'res_model': 'account.move',
'res_id': move.id,
'res_field': 'l10n_it_edi_attachment_file'
})
move.with_context(
account_predictive_bills_disable_prediction=True,
no_new_invoice=True,
).message_post(attachment_ids=attachment.ids)
return move
def _l10n_it_edi_search_partner(self, company, vat, codice_fiscale, email):
for domain in [vat and [('vat', 'ilike', vat)],
codice_fiscale and [('l10n_it_codice_fiscale', 'in', ('IT' + codice_fiscale, codice_fiscale))],
email and ['|', ('email', '=', email), ('l10n_it_pec_email', '=', email)]]:
if domain and (partner := self.env['res.partner'].search(
domain + self.env['res.partner']._check_company_domain(company), limit=1)):
return partner
return self.env['res.partner']
def _l10n_it_edi_search_tax_for_import(self, company, percentage, extra_domain=None, l10n_it_exempt_reason=None):
""" Returns the VAT, Withholding or Pension Fund tax that suits the conditions given
and matches the percentage found in the XML for the company. """
# The tax "23% Ritenuta Agenti e Rappresentanti" is not supported because it's supposed to be a tax of 23% based on
# 50% of the base amount. It's currently implemented as a -11.5% tax. So on 1000, it gives an amount of -115.
# We need to fix the base amount from 1000 to 500.0.
if percentage == -23.0:
percentage = -11.5
domain = [
*self.env['account.tax']._check_company_domain(company),
('amount_type', '=', 'percent'),
] + (extra_domain or [])
# We suppose we're importing a file that comes in as a customer invoice where the sale tax will be 0%.
# To retrieve the correct purchase tax, we examine the sale tax's l10n_it_exempt_reason.
# We determine whether the l10n_it_exempt_reason is specific to reverse charge.
reversed_tax_tag = self._l10n_it_edi_exempt_reason_tag_mapping().get(l10n_it_exempt_reason, '')
if not reversed_tax_tag:
# Normal VAT taxes have a known percentage and generally have all positive repartition lines
domain += [('amount', '=', percentage), ('l10n_it_exempt_reason', '=', l10n_it_exempt_reason)]
taxes = self.env['account.tax'].search(domain).filtered(
lambda tax: all(rep_line.factor_percent >= 0 for rep_line in tax.invoice_repartition_line_ids))
else:
# In case of reverse charge, the purchase tax has a negative repartition line.
domain += [('invoice_repartition_line_ids.tag_ids.name', '=', f'+{reversed_tax_tag.lower()}')]
taxes = self.env['account.tax'].search(domain, order="amount desc").filtered(
lambda tax: any(rep_line.factor_percent < 0 for rep_line in tax.invoice_repartition_line_ids))
return taxes[0] if taxes else taxes
def _l10n_it_edi_get_extra_info(self, company, document_type, body_tree, incoming=True):
""" This function is meant to collect other information that has to be inserted on the invoice lines by submodules.
:return extra_info, messages_to_log"""
return {
'simplified': self.env['account.move']._l10n_it_edi_is_simplified_document_type(document_type),
'type_tax_use_domain': [('type_tax_use', '=', 'purchase' if incoming else 'sale')],
}, []
def _l10n_it_edi_import_invoice(self, invoice, data, is_new):
""" Decodes a l10n_it_edi move into an Odoo move.
:param data: the dictionary with the content to be imported
keys: 'filename', 'content', 'xml_tree', 'type', 'sort_weight'
:param is_new: whether the move is newly created or to be updated
:returns: the imported move
"""
with self._get_edi_creation() as self:
buyer_seller_info = self._l10n_it_buyer_seller_info()
tree = data['xml_tree']
company = self.company_id
# There are 2 cases:
# - cron:
# * Move direction (incoming / outgoing) flexible (no 'default_move_type')
# * I.e. used for import from tax agency
# - "Upload" button (invoices / bills view)
# * Fixed move direction; the button sets the 'default_move_type'
default_move_type = self.env.context.get('default_move_type')
if default_move_type is None:
incoming_possibilities = [True, False]
elif default_move_type in invoice.get_purchase_types(include_receipts=True):
incoming_possibilities = [True]
elif default_move_type in invoice.get_sale_types(include_receipts=True):
incoming_possibilities = [False]
else:
_logger.warning("Cannot handle default_move_type '%s'.", default_move_type)
return
for incoming in incoming_possibilities:
company_role, partner_role = ('buyer', 'seller') if incoming else ('seller', 'buyer')
company_info = buyer_seller_info[company_role]
vat = get_text(tree, company_info['vat_xpath'])
if vat and vat .casefold() in (company.vat or '').casefold():
break
codice_fiscale = get_text(tree, company_info['codice_fiscale_xpath'])
if codice_fiscale and codice_fiscale.casefold() in (company.l10n_it_codice_fiscale or '').casefold():
break
else:
invoice.message_post(body=_("Your company's VAT number and Fiscal Code haven't been found in the buyer and/or seller sections inside the document."))
return
# For unsupported document types, just assume in_invoice, and log that the type is unsupported
document_type = get_text(tree, '//DatiGeneraliDocumento/TipoDocumento')
move_type = self._l10n_it_edi_document_type_mapping().get(document_type, {}).get('import_type')
if not move_type:
move_type = "in_invoice"
_logger.info('Document type not managed: %s. Invoice type is set by default.', document_type)
if not incoming and move_type.startswith('in_'):
move_type = 'out' + move_type[2:]
self.move_type = move_type
if self.name and self.name != '/':
# the journal might've changed, so we need to recompute the name in case it was set (first entry in journal)
self.name = False
self._compute_name()
# Collect extra info from the XML that may be used by submodules to further put information on the invoice lines
extra_info, message_to_log = self._l10n_it_edi_get_extra_info(company, document_type, tree, incoming=incoming)
# Partner
partner_info = buyer_seller_info[partner_role]
vat = get_text(tree, partner_info['vat_xpath'])
codice_fiscale = get_text(tree, partner_info['codice_fiscale_xpath'])
email = get_text(tree, '//DatiTrasmissione//Email') if partner_info['role'] == 'seller' else ''
if partner := self._l10n_it_edi_search_partner(company, vat, codice_fiscale, email):
self.partner_id = partner
else:
message = Markup("<br/>").join((
_("Partner not found, useful informations from XML file:"),
self._compose_info_message(tree, partner_info['section_xpath'])
))
message_to_log.append(message)
# Numbering attributed by the transmitter
if progressive_id := get_text(tree, '//ProgressivoInvio'):
self.payment_reference = progressive_id
# Document Number
if number := get_text(tree, './/DatiGeneraliDocumento//Numero'):
self.ref = number
# Currency
if currency_str := get_text(tree, './/DatiGeneraliDocumento/Divisa'):
currency = self.env.ref('base.%s' % currency_str.upper(), raise_if_not_found=False)
if currency != self.env.company.currency_id and currency.active:
self.currency_id = currency
# Date
if document_date := get_date(tree, './/DatiGeneraliDocumento/Data'):
self.invoice_date = document_date
else:
message_to_log.append(_("Document date invalid in XML file: %s", document_date))
# Stamp Duty
if stamp_duty := get_text(tree, './/DatiGeneraliDocumento/DatiBollo/ImportoBollo'):
self.l10n_it_stamp_duty = float(stamp_duty)
# Comment
for narration in get_text(tree, './/DatiGeneraliDocumento//Causale', many=True):
self.narration = '%s%s<br/>' % (self.narration or '', narration)
# Informations relative to the purchase order, the contract, the agreement,
# the reception phase or invoices previously transmitted
# <2.1.2> - <2.1.6>
for document_type in ['DatiOrdineAcquisto', 'DatiContratto', 'DatiConvenzione', 'DatiRicezione', 'DatiFattureCollegate']:
for element in tree.xpath('.//DatiGenerali/' + document_type):
message = Markup("{} {}<br/>{}").format(document_type, _("from XML file:"), self._compose_info_message(element, '.'))
message_to_log.append(message)
# Dati DDT. <2.1.8>
if elements := tree.xpath('.//DatiGenerali/DatiDDT'):
message = Markup("<br/>").join((
_("Transport informations from XML file:"),
self._compose_info_message(tree, './/DatiGenerali/DatiDDT')
))
message_to_log.append(message)
# Due date. <2.4.2.5>
if due_date := get_date(tree, './/DatiPagamento/DettaglioPagamento/DataScadenzaPagamento'):
self.invoice_date_due = fields.Date.to_string(due_date)
else:
message_to_log.append(_("Payment due date invalid in XML file: %s", str(due_date)))
# Information related to the purchase order <2.1.2>
if (po_refs := get_text(tree, '//DatiGenerali/DatiOrdineAcquisto/IdDocumento', many=True)):
self.invoice_origin = ", ".join(po_refs)
# Total amount. <2.4.2.6>
if amount_total := sum(float(x) for x in get_text(tree, './/ImportoPagamento', many=True) if x):
message_to_log.append(_("Total amount from the XML File: %s", amount_total))
# Bank account. <2.4.2.13>
if self.move_type not in ('out_invoice', 'in_refund'):
if acc_number := get_text(tree, './/DatiPagamento/DettaglioPagamento/IBAN'):
if self.partner_id and self.partner_id.commercial_partner_id:
bank = self.env['res.partner.bank'].search([
('acc_number', '=', acc_number),
('partner_id', '=', self.partner_id.commercial_partner_id.id),
('company_id', 'in', [self.company_id.id, False])
], order='company_id', limit=1)
else:
bank = self.env['res.partner.bank'].search([
('acc_number', '=', acc_number),
('company_id', 'in', [self.company_id.id, False])
], order='company_id', limit=1)
if bank:
self.partner_bank_id = bank
else:
message = Markup("<br/>").join((
_("Bank account not found, useful informations from XML file:"),
self._compose_info_message(tree, [
'.//DatiPagamento//Beneficiario',
'.//DatiPagamento//IstitutoFinanziario',
'.//DatiPagamento//IBAN',
'.//DatiPagamento//ABI',
'.//DatiPagamento//CAB',
'.//DatiPagamento//BIC',
'.//DatiPagamento//ModalitaPagamento'
])
))
message_to_log.append(message)
elif elements := tree.xpath('.//DatiPagamento/DettaglioPagamento'):
message = Markup("<br/>").join((
_("Bank account not found, useful informations from XML file:"),
self._compose_info_message(tree, './/DatiPagamento')
))
message_to_log.append(message)
# Invoice lines. <2.2.1>
tag_name = './/DettaglioLinee' if not extra_info['simplified'] else './/DatiBeniServizi'
for element in tree.xpath(tag_name):
move_line = self.invoice_line_ids.create({
'move_id': self.id,
'tax_ids': [fields.Command.clear()]})
if move_line:
message_to_log += self._l10n_it_edi_import_line(element, move_line, extra_info)
# Global discount summarized in 1 amount
if discount_elements := tree.xpath('.//DatiGeneraliDocumento/ScontoMaggiorazione'):
taxable_amount = float(self.tax_totals['base_amount_currency'])
discounted_amount = taxable_amount
for discount_element in discount_elements:
discount_sign = 1
if (discount_type := discount_element.xpath('.//Tipo')) and discount_type[0].text == 'MG':
discount_sign = -1
if discount_amount := get_text(discount_element, './/Importo'):
discounted_amount -= discount_sign * float(discount_amount)
continue
if discount_percentage := get_text(discount_element, './/Percentuale'):
discounted_amount *= 1 - discount_sign * float(discount_percentage) / 100
general_discount = discounted_amount - taxable_amount
sequence = len(elements) + 1
self.invoice_line_ids = [Command.create({
'sequence': sequence,
'name': 'SCONTO' if general_discount < 0 else 'MAGGIORAZIONE',
'price_unit': general_discount,
})]
for element in tree.xpath('.//Allegati'):
attachment_64 = self.env['ir.attachment'].create({
'name': get_text(element, './/NomeAttachment'),
'datas': str.encode(get_text(element, './/Attachment')),
'type': 'binary',
'res_model': 'account.move',
'res_id': self.id,
})
# no_new_invoice to prevent from looping on the.message_post that would create a new invoice without it
self.with_context(no_new_invoice=True).sudo().message_post(
body=(_("Attachment from XML")),
attachment_ids=[attachment_64.id],
)
for message in message_to_log:
self.sudo().message_post(body=message)
return self
@api.model
def _is_prediction_enabled(self):
return self.env['ir.module.module'].search([('name', '=', 'account_accountant'), ('state', '=', 'installed')])
def _l10n_it_edi_import_line(self, element, move_line, extra_info=None):
extra_info = extra_info or {}
company = move_line.company_id
partner = move_line.partner_id
message_to_log = []
predict_enabled = self._is_prediction_enabled()
# Sequence.
line_elements = element.xpath('.//NumeroLinea')
if line_elements:
move_line.sequence = int(line_elements[0].text)
# Name.
move_line.name = " ".join(get_text(element, './/Descrizione').split())
# Product.
if elements_code := element.xpath('.//CodiceArticolo'):
for element_code in elements_code:
type_code = element_code.xpath('.//CodiceTipo')[0]
code = element_code.xpath('.//CodiceValore')[0]
product = self.env['product.product'].search([('barcode', '=', code.text)])
if (product and type_code.text == 'EAN'):
move_line.product_id = product
break
if partner:
product_supplier = self.env['product.supplierinfo'].search([('partner_id', '=', partner.id), ('product_code', '=', code.text)], limit=2)
if product_supplier and len(product_supplier) == 1 and product_supplier.product_id:
move_line.product_id = product_supplier.product_id
break
if not move_line.product_id:
for element_code in elements_code:
code = element_code.xpath('.//CodiceValore')[0]
product = self.env['product.product'].search([('default_code', '=', code.text)], limit=2)
if product and len(product) == 1:
move_line.product_id = product
break
# If no product is found, try to find a product that may be fitting
if predict_enabled and not move_line.product_id:
fitting_product = move_line._predict_product()
if fitting_product:
name = move_line.name
move_line.product_id = fitting_product
move_line.name = name
if predict_enabled:
# Fitting account for the line
fitting_account = move_line._predict_account()
if fitting_account:
move_line.account_id = fitting_account
# Quantity.
move_line.quantity = float(get_text(element, './/Quantita') or '1')
# Taxes
percentage = None
if not extra_info['simplified']:
percentage = get_float(element, './/AliquotaIVA')
if price_unit := get_float(element, './/PrezzoUnitario'):
move_line.price_unit = price_unit
elif amount := get_float(element, './/Importo'):
percentage = get_float(element, './/Aliquota')
if not percentage and (tax_amount := get_float(element, './/Imposta')):
percentage = round(tax_amount / (amount - tax_amount) * 100)
move_line.price_unit = amount / (1 + percentage / 100)
move_line.tax_ids = []
if percentage is not None:
l10n_it_exempt_reason = get_text(element, './/Natura').upper() or False
extra_domain = extra_info.get('type_tax_use_domain', [('type_tax_use', '=', 'purchase')])
if tax := self._l10n_it_edi_search_tax_for_import(company, percentage, extra_domain, l10n_it_exempt_reason=l10n_it_exempt_reason):
move_line.tax_ids += tax
else:
message = Markup("<br/>").join((
_("Tax not found for line with description '%s'", move_line.name),
self._compose_info_message(element, '.')
))
message_to_log.append(message)
# If no taxes were found, try to find taxes that may be fitting
if predict_enabled and not move_line.tax_ids:
fitting_taxes = move_line._predict_taxes()
if fitting_taxes:
move_line.tax_ids = [Command.set(fitting_taxes)]
# Discounts
if elements := element.xpath('.//ScontoMaggiorazione'):
# Special case of only 1 percentage discount
if len(elements) == 1:
element = elements[0]
if discount_percentage := get_float(element, './/Percentuale'):
discount_type = get_text(element, './/Tipo')
discount_sign = -1 if discount_type == 'MG' else 1
move_line.discount = discount_sign * discount_percentage
# Discounts in cascade summarized in 1 percentage
else:
total = get_float(element, './/PrezzoTotale')
discount = 100 - (100 * total) / (move_line.quantity * move_line.price_unit)
move_line.discount = discount
return message_to_log
def _l10n_it_edi_format_errors(self, header, errors):
return Markup('{}<ul class="mb-0">{}</ul>').format(
nl2br_enclose(header, 'span') if header else '',
Markup().join(nl2br_enclose(' '.join(error.split()), 'li') for error in errors)
)
def _compose_info_message(self, tree, tags):
result = ""
for tag in tags if isinstance(tags, list) else [tags]:
for el in tree.xpath(tag):
result += self._l10n_it_edi_format_errors("", [f'{subel.tag}: {subel.text}' for subel in el.iter()])
return result
# -------------------------------------------------------------------------
# EDI: Export
# -------------------------------------------------------------------------
def _l10n_it_edi_export_data_check(self):
""" This function checks the Settings, Company, Partners, Moves involved in the
sending activity and returns an errors dictionary ready for the
actionable_errors widget to display. """
companies = self.mapped("company_id")
companies_partners = companies.mapped("partner_id")
moves_full = self.filtered(lambda m: not m._l10n_it_edi_is_simplified())
moves_simplified = self.filtered(lambda m: m._l10n_it_edi_is_simplified())
full = moves_full.mapped("commercial_partner_id").filtered(lambda p: p not in companies_partners)
simplified = moves_simplified.mapped("commercial_partner_id").filtered(lambda p: p not in companies_partners | full)
representatives = companies.mapped("l10n_it_tax_representative_partner_id").filtered(lambda p: p not in companies_partners | simplified | full)
return {
**companies._l10n_it_edi_export_check(),
**full._l10n_it_edi_export_check(['partner_address_missing']),
**simplified._l10n_it_edi_export_check(['partner_country_missing']),
**(simplified | full)._l10n_it_edi_export_check(['partner_vat_codice_fiscale_missing']),
**representatives._l10n_it_edi_export_check(['partner_vat_missing']),
**self._l10n_it_edi_base_export_check(),
**self._l10n_it_edi_export_taxes_check(),
}
def _l10n_it_edi_base_export_check(self):
def build_error(message, records):
return {
'message': message,
**({
'action_text': _("View invoice(s)"),
'action': records._get_records_action(name=_("Invoice(s) to check")),
} if len(self) > 1 else {})
}
errors = {}
if moves := self.filtered(lambda move: move.l10n_it_edi_is_self_invoice and move._l10n_it_edi_services_or_goods() == 'both'):
errors['l10n_it_edi_move_rc_mixed_product_types'] = build_error(
message=_("Cannot apply Reverse Charge to bills which contains both services and goods."),
records=moves)
if pa_moves := self.filtered(lambda move: move.commercial_partner_id._l10n_it_edi_is_public_administration()):
if moves := pa_moves.filtered(lambda move: not move.l10n_it_origin_document_type):
message = _("Partner(s) belongs to the Public Administration, please fill out Origin Document Type field in the Electronic Invoicing tab.")
errors['move_missing_origin_document'] = build_error(message=message, records=moves)
if moves := pa_moves.filtered(lambda move: move.l10n_it_origin_document_date and move.l10n_it_origin_document_date > fields.Date.today()):
message = _("The Origin Document Date cannot be in the future.")
errors['l10n_it_edi_move_future_origin_document_date'] = build_error(message=message, records=moves)
if pa_moves := self.filtered(lambda move: len(move.commercial_partner_id.l10n_it_pa_index or '') == 7):
if moves := pa_moves.filtered(lambda move: not move.l10n_it_origin_document_type and move.l10n_it_cig and move.l10n_it_cup):
message = _("CIG/CUP fields of partner(s) are present, please fill out Origin Document Type field in the Electronic Invoicing tab.")
errors['move_missing_origin_document_field'] = build_error(message=message, records=moves)
return errors
def _l10n_it_edi_export_taxes_check(self):
if move_lines := self.mapped("invoice_line_ids").filtered(lambda line:
line.display_type == 'product'
and len(line.tax_ids.flatten_taxes_hierarchy()._l10n_it_filter_kind('vat')) != 1
):
return {
'l10n_it_edi_move_only_one_vat_tax_per_line': {
'message': _("Invoices must have exactly one VAT tax set per line."),
**({
'action_text': _("View invoice(s)"),
'action': move_lines.mapped("move_id")._get_records_action(name=_("Check taxes on invoice lines")),
} if len(self) > 1 else {})
}}
return {}
def _l10n_it_edi_get_formatters(self):
def format_alphanumeric(text, maxlen=None):
if not text:
return False
text = text.encode('latin-1', 'replace').decode('latin-1')
if maxlen and maxlen > 0:
text = text[:maxlen]
elif maxlen and maxlen < 0:
text = text[maxlen:]
return text
def format_date(dt):
# Format the date in the italian standard.
dt = dt or datetime.now()
return dt.strftime('%Y-%m-%d')
def format_monetary(number, currency):
# Format the monetary values to avoid trailing decimals (e.g. 90.85000000000001).
return float_repr(number, min(2, currency.decimal_places))
def format_float(amount, precision):
if amount is None or amount is False:
return None
# Avoid things like -0.0, see: https://stackoverflow.com/a/11010869
return '%.*f' % (precision, amount if not float_is_zero(amount, precision_digits=precision) else 0.0)
def format_numbers(number):
#format number to str with between 2 and 8 decimals (event if it's .00)
number_splited = str(number).split('.')
if len(number_splited) == 1:
return "%.02f" % number
cents = number_splited[1]
if len(cents) > 8:
return "%.08f" % number
return float_repr(number, max(2, len(cents)))
def format_numbers_two(number):
#format number to str with 2 (event if it's .00)
return "%.02f" % number
def format_phone(number):
if not number:
return False
number = number.replace(' ', '').replace('/', '').replace('.', '')
if len(number) > 4 and len(number) < 13:
return format_alphanumeric(number)
return False
def format_address(street, street2, maxlen=60):
street, street2 = street or '', street2 or ''
if street and len(street) >= maxlen:
street2 = ''
sep = ' ' if street and street2 else ''
return format_alphanumeric(f"{street}{sep}{street2}", maxlen)
return {
'format_date': format_date,
'format_float': format_float,
'format_monetary': format_monetary,
'format_numbers': format_numbers,
'format_numbers_two': format_numbers_two,
'format_phone': format_phone,
'format_alphanumeric': format_alphanumeric,
'format_address': format_address,
}
def _l10n_it_edi_render_xml(self, pdf_values=None):
''' Create the xml file content.
:return: The XML content as bytestring.
'''
qweb_template_name = (
'l10n_it_edi.account_invoice_it_FatturaPA_export' if not self._l10n_it_edi_is_simplified()
else 'l10n_it_edi.account_invoice_it_simplified_FatturaPA_export')
xml_content = self.env['ir.qweb']._render(qweb_template_name, {
**self._l10n_it_edi_get_values(pdf_values),
**self._l10n_it_edi_get_formatters()})
xml_node = cleanup_xml_node(xml_content, remove_blank_nodes=False)
return etree.tostring(xml_node, xml_declaration=True, encoding='UTF-8')
def _l10n_it_edi_get_attachment_values(self, pdf_values=None):
self.ensure_one()
return {
'name': self._l10n_it_edi_generate_filename(),
'type': 'binary',
'mimetype': 'application/xml',
'description': _('IT EDI e-move: %s', self.move_type),
'company_id': self.company_id.id,
'res_id': self.id,
'res_model': self._name,
'res_field': 'l10n_it_edi_attachment_file',
'raw': self._l10n_it_edi_render_xml(pdf_values=pdf_values),
}
def _l10n_it_edi_generate_filename(self):
'''Returns a name conform to the Fattura pa Specifications:
See ES documentation 2.2
'''
a = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
# Each company should have its own filename sequence. If it does not exist, create it
n = self.env['ir.sequence'].with_company(self.company_id).next_by_code('l10n_it_edi.fattura_filename')
if not n:
# The offset is used to avoid conflicts with existing filenames
offset = 62 ** 4
sequence = self.env['ir.sequence'].sudo().create({
'name': 'FatturaPA Filename Sequence',
'code': 'l10n_it_edi.fattura_filename',
'company_id': self.company_id.id,
'number_next': offset,
})
n = sequence._next()
# The n is returned as a string, but we require an int
n = int(''.join(filter(lambda c: c.isdecimal(), n)))
progressive_number = ""
while n:
(n, m) = divmod(n, len(a))
progressive_number = a[m] + progressive_number
return '%(country_code)s%(codice)s_%(progressive_number)s.xml' % {
'country_code': self.company_id.country_id.code,
'codice': self.company_id.partner_id._l10n_it_edi_normalized_codice_fiscale(),
'progressive_number': progressive_number.zfill(5),
}
def _l10n_it_edi_send(self, attachments_vals):
files_to_upload = []
filename_move = {}
# Setup moves for sending
for move in self:
attachment_vals = attachments_vals[move]
filename = attachment_vals['name']
content = b64encode(attachment_vals['raw']).decode()
move.l10n_it_edi_header = False
if move.commercial_partner_id._l10n_it_edi_is_public_administration():
move.l10n_it_edi_state = 'requires_user_signature'
move.l10n_it_edi_transaction = False
move.sudo().message_post(body=nl2br(_(
"Sending invoices to Public Administration partners is not supported.\n"
"The IT EDI XML file is generated, please sign the document and upload it "
"through the 'Fatture e Corrispettivi' portal of the Tax Agency."
)))
else:
move.l10n_it_edi_state = 'being_sent'
files_to_upload.append({'filename': filename, 'xml': content})
filename_move[filename] = move
# Upload files
try:
results = self._l10n_it_edi_upload(files_to_upload)
except AccountEdiProxyError as e:
messages_to_log = []
for filename in filename_move:
unsent_move = filename_move[filename]
unsent_move.l10n_it_edi_state = False
text_message = _("Error uploading the e-invoice file %(file)s.\n%(error)s", file=filename, error=e.message)
html_message = nl2br(text_message)
unsent_move.l10n_it_edi_header = text_message
unsent_move.sudo().message_post(body=html_message)
messages_to_log.append(text_message)
raise UserError("\n".join(messages_to_log)) from e
# Handle results
for filename, vals in results.items():
sent_move = filename_move[filename]
if 'error' in vals:
sent_move.l10n_it_edi_state = False
sent_move.l10n_it_edi_transaction = False
message = nl2br(_("Error uploading the e-invoice file %(file)s.\n%(error)s", file=filename, error=vals['error']))
else:
is_demo = vals['id_transaction'] == 'demo'
sent_move.l10n_it_edi_state = 'processing'
sent_move.l10n_it_edi_transaction = vals['id_transaction']
message = (
_("We are simulating the sending of the e-invoice file %s, as we are in demo mode.", filename)
if is_demo else _("The e-invoice file %s was sent to the SdI for processing.", filename))
sent_move.l10n_it_edi_header = message
sent_move.sudo().message_post(body=message)
def _l10n_it_edi_upload(self, files):
'''Upload files to the SdI.
:param files: A list of dictionary {filename, base64_xml}.
:returns: A dictionary.
* message: Message from fatturapa.
* transactionId: The fatturapa ID of this request.
* error: An eventual error.
'''
if not files:
return {}
proxy_user = self.company_id.l10n_it_edi_proxy_user_id
proxy_user.ensure_one()
if proxy_user.edi_mode == 'demo':
return {file_data['filename']: {'id_transaction': 'demo'} for file_data in files}
ERRORS = {'EI01': _('Attached file is empty'),
'EI02': _('Service momentarily unavailable'),
'EI03': _('Unauthorized user')}
server_url = proxy_user._get_server_url()
results = proxy_user._make_request(
f'{server_url}/api/l10n_it_edi/1/out/SdiRiceviFile',
params={'files': files})
for filename, vals in results.items():
if 'error' in vals:
results[filename]['error'] = ERRORS.get(vals.get('error'), _("Unknown error"))
return results
# -------------------------------------------------------------------------
# EDI: Update notifications
# -------------------------------------------------------------------------
def _l10n_it_edi_update_send_state(self):
''' Check if the current invoices have been processed by the SdI. '''
proxy_user = self.company_id.l10n_it_edi_proxy_user_id
if proxy_user.edi_mode == 'demo':
for move in self:
filename = move.l10n_it_edi_attachment_id and move.l10n_it_edi_attachment_id.name or '???'
self._l10n_it_edi_write_send_state(
transformed_notification={
'l10n_it_edi_state': 'forwarded',
'l10n_it_edi_transaction': f'demo_{uuid.uuid4()}',
'send_ack_to_edi_proxy': False,
'date': fields.Date.today(),
'filename': filename},
message=_("The e-invoice file %s has been sent in Demo EDI mode.", filename))
return
server_url = proxy_user._get_server_url()
try:
notifications = proxy_user._make_request(
f'{server_url}/api/l10n_it_edi/1/in/TrasmissioneFatture',
params={'ids_transaction': self.mapped("l10n_it_edi_transaction")})
except AccountEdiProxyError as pe:
raise UserError(_("An error occurred while downloading updates from the Proxy Server: (%(code)s) %(message)s", code=pe.code, message=pe.message)) from pe
for _id_transaction, notification in notifications.items():
encrypted_update_content = notification.get('file')
encryption_key = notification.get('key')
if (encrypted_update_content and encryption_key):
notification['xml_content'] = proxy_user._decrypt_data(encrypted_update_content, encryption_key)
acks = {'transaction_ids': [], 'states': []}
for move in self:
notification = notifications[move.l10n_it_edi_transaction]
parsed_notification = move._l10n_it_edi_parse_notification(notification)
transformed_notification = move._l10n_it_edi_transform_notification(parsed_notification)
message = move._l10n_it_edi_get_message(transformed_notification)
move._l10n_it_edi_write_send_state(transformed_notification, message)
if (
transformed_notification.get('send_ack_to_edi_proxy')
and (id_transaction_to_ack := transformed_notification.get('l10n_it_edi_transaction'))
and (ack_state := transformed_notification.get('l10n_it_edi_state'))
):
acks['transaction_ids'].append(id_transaction_to_ack)
acks['states'].append(ack_state)
if acks:
transaction_ids = acks['transaction_ids']
states = acks['states']
try:
proxy_user._make_request(
f'{server_url}/api/l10n_it_edi/1/ack',
params={'transaction_ids': transaction_ids, 'states': states})
except AccountEdiProxyError as pe:
raise UserError(_("An error occurred while downloading updates from the Proxy Server: (%(code)s) %(message)s", code=pe.code, message=pe.message)) from pe
def _l10n_it_edi_parse_notification(self, notification):
sdi_state = notification.get('state', '')
if not (xml_content := notification.get('xml_content')):
return {'sdi_state': sdi_state}
decrypted_update_content = etree.fromstring(xml_content)
outcome = get_text(decrypted_update_content, './/Esito')
date_arrival = get_datetime(decrypted_update_content, './/DataOraRicezione') or fields.Date.today()
errors = [(
get_text(error_element, '//Codice'),
get_text(error_element, '//Descrizione'),
) for error_element in decrypted_update_content.xpath('//Errore')]
filename = get_text(decrypted_update_content, './/NomeFile')
return {
'sdi_state': sdi_state,
'errors': errors,
'outcome': outcome,
'date': date_arrival,
'filename': filename,
}
def _l10n_it_edi_transform_notification(self, parsed_notification):
""" Reads the notification XML coming from the EDI Proxy Server
Recovers information about the new state.
Computes whether the EDI Proxy Server is to be acked,
and whether the id_transaction has to be reset.
"""
self.ensure_one()
state_map = {
'not_found': False,
'awaiting_outcome': 'processing',
'notificaScarto': 'rejected',
'ricevutaConsegna': 'forwarded',
'forward_attempt': 'forward_attempt',
'notificaMancataConsegna': 'forward_failed',
('notificaEsito', 'EC01'): 'accepted_by_pa_partner',
('notificaEsito', 'EC02'): 'rejected_by_pa_partner',
'notificaDecorrenzaTermini': 'accepted_by_pa_partner_after_expiry',
}
sdi_state = parsed_notification['sdi_state']
filename = parsed_notification.get('filename')
errors = parsed_notification.get('errors', [])
date = parsed_notification.get('date', fields.Date.today())
if not filename and self.l10n_it_edi_attachment_id:
filename = self.l10n_it_edi_attachment_id.name
outcome = parsed_notification.get('outcome', False)
if not outcome:
new_state = state_map.get(sdi_state, False)
else:
new_state = state_map.get((sdi_state, outcome), False)
parsed_notification.update({
'l10n_it_edi_state': new_state,
'l10n_it_edi_transaction': False if new_state in (False, 'rejected') else self.l10n_it_edi_transaction,
'send_ack_to_edi_proxy': bool(new_state),
'date': date,
'errors': errors,
'filename': filename,
})
return parsed_notification
def _l10n_it_edi_write_send_state(self, transformed_notification, message):
""" Update the record with the data coming from the IAP server.
Eventually post the message.
Commit the transaction.
"""
self.ensure_one()
old_state = self.l10n_it_edi_state
new_state = transformed_notification['l10n_it_edi_state']
self.write({
'l10n_it_edi_state': new_state,
'l10n_it_edi_transaction': transformed_notification['l10n_it_edi_transaction'],
'l10n_it_edi_header': message or False,
})
if message and old_state != new_state:
self.with_context(no_new_invoice=True).sudo().message_post(body=message)
if new_state == 'rejected':
self.l10n_it_edi_attachment_file = False
self.env.cr.commit()
def _l10n_it_edi_get_message(self, transformed_notification):
""" The status change will be notified in the chatter of the move.
Compute the message from the notification information coming from the EDI Proxy Server
"""
self.ensure_one()
partner = self.commercial_partner_id
partner_name = partner.display_name
filename = transformed_notification['filename']
new_state = transformed_notification['l10n_it_edi_state']
if new_state == 'rejected':
DUPLICATE_MOVE = '00404'
DUPLICATE_FILENAME = '00002'
error_descriptions = []
for error_code, error_description in transformed_notification['errors']:
error_description_copy = error_description
if error_code == DUPLICATE_MOVE:
error_description_copy = _(
"The e-invoice file %(file)s is duplicated.\n"
"Original message from the SdI: %(message)s",
file=filename, message=error_description_copy)
elif error_code == DUPLICATE_FILENAME:
error_description_copy = _(
"The e-invoice filename %(file)s is duplicated. Please check the FatturaPA Filename sequence.\n"
"Original message from the SdI: %(message)s",
file=filename, message=error_description_copy)
error_descriptions.append(error_description_copy)
return self._l10n_it_edi_format_errors(_('The e-invoice has been refused by the SdI.'), error_descriptions)
elif partner._l10n_it_edi_is_public_administration():
pa_specific_map = {
'forwarded': nl2br(_(
"The e-invoice file %(file)s was succesfully sent to the SdI.\n"
"%(partner)s has 15 days to accept or reject it.",
file=filename, partner=partner_name)),
'forward_attempt': nl2br(_(
"The e-invoice file %(file)s can't be forward to %(partner)s (Public Administration) by the SdI at the moment.\n"
"It will try again for 10 days, after which it will be considered accepted, but "
"you will still have to send it by post or e-mail.",
file=filename, partner=partner_name)),
'accepted_by_pa_partner_after_expiry': nl2br(_(
"The e-invoice file %(file)s is succesfully sent to the SdI. The invoice is now considered fiscally relevant.\n"
"The %(partner)s (Public Administration) had 15 days to either accept or refused this document,"
"but since they did not reply, it's now considered accepted.",
file=filename, partner=partner_name)),
'rejected_by_pa_partner': nl2br(_(
"The e-invoice file %(file)s has been refused by %(partner)s (Public Administration).\n"
"You have 5 days from now to issue a full refund for this invoice, "
"then contact the PA partner to create a new one according to their "
"requests and submit it.",
file=filename, partner=partner_name)),
'accepted_by_pa_partner': _(
"The e-invoice file %(file)s has been accepted by %(partner)s (Public Administration), a payment will be issued soon",
file=filename, partner=partner_name),
}
if pa_specific_message := pa_specific_map.get(new_state):
return pa_specific_message
new_state_messages_map = {
False: _(
"The e-invoice file %s has not been found on the EDI Proxy server.", filename),
'processing': nl2br(_(
"The e-invoice file %s was sent to the SdI for validation.\n"
"It is not yet considered accepted, please wait further notifications.",
filename)),
'forwarded': _(
"The e-invoice file %(file)s was accepted and succesfully forwarded it to %(partner)s by the SdI.",
file=filename, partner=partner_name),
'forward_attempt': nl2br(_(
"The e-invoice file %(file)s has been accepted by the SdI.\n"
"The SdI is trying to forward it to %(partner)s.\n"
"It will try for up to 2 days, after which you'll eventually "
"need to send it the invoice to the partner by post or e-mail.",
file=filename, partner=partner_name)),
'forward_failed': nl2br(_(
"The e-invoice file %(file)s couldn't be forwarded to %(partner)s.\n"
"Please remember to send it via post or e-mail.",
file=filename, partner=partner_name))
}
return new_state_messages_map.get(new_state)