1822 lines
89 KiB
Python
1822 lines
89 KiB
Python
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
from base64 import b64encode
|
|
from datetime import datetime
|
|
import logging
|
|
from lxml import etree
|
|
import uuid
|
|
|
|
from odoo import _, api, Command, fields, models, modules
|
|
from odoo.addons.base.models.ir_qweb_fields import Markup, nl2br, nl2br_enclose
|
|
from odoo.addons.account_edi_proxy_client.models.account_edi_proxy_user import AccountEdiProxyError
|
|
from odoo.exceptions import UserError
|
|
from odoo.tools import float_compare, float_repr, cleanup_xml_node, float_is_zero
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
WAITING_STATES = ('being_sent', 'processing', 'forward_attempt')
|
|
|
|
|
|
# -------------------------------------------------------------------------
|
|
# XML tool functions
|
|
# -------------------------------------------------------------------------
|
|
|
|
def get_text(tree, xpath, many=False):
|
|
texts = [el.text.strip() for el in tree.xpath(xpath) if el.text]
|
|
return texts if many else texts[0] if texts else ''
|
|
|
|
def get_float(tree, xpath):
|
|
try:
|
|
return float(get_text(tree, xpath))
|
|
except ValueError:
|
|
return 0.0
|
|
|
|
def get_date(tree, xpath):
|
|
""" Dates in FatturaPA are ISO 8601 date format, pattern '[-]CCYY-MM-DD[Z|(+|-)hh:mm]' """
|
|
dt = get_datetime(tree, xpath)
|
|
return dt.date() if dt else False
|
|
|
|
def get_datetime(tree, xpath):
|
|
""" Datetimes in FatturaPA are ISO 8601 date format, pattern '[-]CCYY-MM-DDThh:mm:ss[Z|(+|-)hh:mm]'
|
|
Python 3.7 -> 3.11 doesn't support 'Z'.
|
|
"""
|
|
if datetime_str := get_text(tree, xpath):
|
|
try:
|
|
return datetime.fromisoformat(datetime_str.replace('Z', '+00:00'))
|
|
except (ValueError, TypeError):
|
|
return False
|
|
return False
|
|
|
|
|
|
class AccountMove(models.Model):
|
|
_inherit = 'account.move'
|
|
|
|
l10n_it_edi_state = fields.Selection(
|
|
string="SDI State",
|
|
selection=[
|
|
('being_sent', 'Being Sent To SdI'),
|
|
('requires_user_signature', 'Requires user signature'),
|
|
('processing', 'SdI Processing'),
|
|
('rejected', 'SdI Rejected'),
|
|
('forwarded', 'SdI Accepted, Forwarded to Partner'),
|
|
('forward_failed', 'SdI Accepted, Forward to Partner Failed'),
|
|
('forward_attempt', 'SdI Accepted, Forwarding to Partner'),
|
|
('accepted_by_pa_partner', 'SdI Accepted, Accepted by the PA Partner'),
|
|
('rejected_by_pa_partner', 'SdI Accepted, Rejected by the PA Partner'),
|
|
('accepted_by_pa_partner_after_expiry', 'SdI Accepted, PA Partner Expired Terms'),
|
|
],
|
|
copy=False, tracking=True,
|
|
help="This state is updated by default, but you can force the value. ",
|
|
)
|
|
l10n_it_edi_header = fields.Html(
|
|
help='User description of the current state, with hints to make the flow progress',
|
|
readonly=True,
|
|
copy=False,
|
|
)
|
|
l10n_it_edi_transaction = fields.Char(copy=False, string="FatturaPA Transaction")
|
|
l10n_it_edi_attachment_file = fields.Binary(copy=False, attachment=True)
|
|
l10n_it_edi_attachment_id = fields.Many2one(
|
|
comodel_name='ir.attachment',
|
|
string="FatturaPA Attachment",
|
|
compute=lambda self: self._compute_linked_attachment_id('l10n_it_edi_attachment_id', 'l10n_it_edi_attachment_file'),
|
|
depends=['l10n_it_edi_attachment_file'],
|
|
)
|
|
l10n_it_edi_is_self_invoice = fields.Boolean(compute="_compute_l10n_it_edi_is_self_invoice")
|
|
l10n_it_stamp_duty = fields.Float(string="Dati Bollo")
|
|
l10n_it_ddt_id = fields.Many2one('l10n_it.ddt', string='DDT', copy=False)
|
|
|
|
l10n_it_origin_document_type = fields.Selection(
|
|
string="Origin Document Type",
|
|
selection=[('purchase_order', 'Purchase Order'), ('contract', 'Contract'), ('agreement', 'Agreement')],
|
|
copy=False)
|
|
l10n_it_origin_document_name = fields.Char(
|
|
string="Origin Document Name",
|
|
copy=False)
|
|
l10n_it_origin_document_date = fields.Date(
|
|
string="Origin Document Date",
|
|
copy=False)
|
|
l10n_it_cig = fields.Char(
|
|
string="CIG",
|
|
copy=False,
|
|
help="Tender Unique Identifier")
|
|
l10n_it_cup = fields.Char(
|
|
string="CUP",
|
|
copy=False,
|
|
help="Public Investment Unique Identifier")
|
|
# Technical field for showing the above fields or not
|
|
l10n_it_partner_pa = fields.Boolean(compute='_compute_l10n_it_partner_pa')
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Computes
|
|
# -------------------------------------------------------------------------
|
|
|
|
@api.depends('commercial_partner_id.l10n_it_pa_index', 'company_id')
|
|
def _compute_l10n_it_partner_pa(self):
|
|
for move in self:
|
|
partner = move.commercial_partner_id
|
|
move.l10n_it_partner_pa = partner and (partner._l10n_it_edi_is_public_administration() or len(partner.l10n_it_pa_index or '') == 7)
|
|
|
|
@api.depends('move_type', 'line_ids.tax_tag_ids')
|
|
def _compute_l10n_it_edi_is_self_invoice(self):
|
|
"""
|
|
Italian EDI requires Vendor bills coming from EU countries to be sent as self-invoices.
|
|
We recognize these cases based on the taxes that target the VJ tax grids, which imply
|
|
the use of VAT External Reverse Charge.
|
|
"""
|
|
purchases = self.filtered(lambda m: m.is_purchase_document())
|
|
others = self - purchases
|
|
for move in others:
|
|
move.l10n_it_edi_is_self_invoice = False
|
|
if purchases:
|
|
it_tax_report_vj_lines = self.env['account.report.line'].sudo().search([
|
|
('report_id.country_id.code', '=', 'IT'),
|
|
('code', '=like', 'VJ%')
|
|
])
|
|
vj_lines_tags = it_tax_report_vj_lines.expression_ids._get_matching_tags()
|
|
for move in purchases:
|
|
invoice_lines_tags = move.line_ids.tax_tag_ids
|
|
ids_intersection = set(invoice_lines_tags.ids) & set(vj_lines_tags.ids)
|
|
move.l10n_it_edi_is_self_invoice = bool(ids_intersection)
|
|
|
|
def _l10n_it_edi_exempt_reason_tag_mapping(self):
|
|
return {
|
|
"N3.2": "VJ3",
|
|
"N3.3": "VJ1",
|
|
"N6.1": "VJ6",
|
|
"N6.2": "VJ7",
|
|
"N6.3": "VJ12",
|
|
"N6.4": "VJ13",
|
|
"N6.5": "VJ14",
|
|
"N6.6": "VJ15",
|
|
"N6.7": "VJ16",
|
|
"N6.8": "VJ17",
|
|
}
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Overrides
|
|
# -------------------------------------------------------------------------
|
|
|
|
@api.depends('l10n_it_edi_transaction')
|
|
def _compute_show_reset_to_draft_button(self):
|
|
# EXTENDS 'account'
|
|
super()._compute_show_reset_to_draft_button()
|
|
for move in self:
|
|
move.show_reset_to_draft_button = not move.l10n_it_edi_transaction and move.show_reset_to_draft_button
|
|
|
|
def _get_edi_decoder(self, file_data, new=False):
|
|
# EXTENDS 'account'
|
|
if file_data['type'] == 'l10n_it_edi':
|
|
return self._l10n_it_edi_import_invoice
|
|
return super()._get_edi_decoder(file_data, new=new)
|
|
|
|
def _post(self, soft=True):
|
|
# EXTENDS 'account'
|
|
self.with_context(skip_is_manually_modified=True).write({'l10n_it_edi_header': False})
|
|
return super()._post(soft)
|
|
|
|
def _extend_with_attachments(self, attachments, new=False):
|
|
result = False
|
|
# Prediction is an enterprise feature.
|
|
if self._is_prediction_enabled():
|
|
# Italy needs a custom order in prediction, since prediction generally deduces taxes
|
|
# from products, while in Italian EDI, taxes are generally explicited in the XML file
|
|
# while the product may not be labelled exactly the same as in the database
|
|
l10n_it_attachments = attachments.filtered(lambda rec: rec._is_l10n_it_edi_import_file())
|
|
if l10n_it_attachments:
|
|
attachments = attachments - l10n_it_attachments
|
|
result = super(AccountMove, self.with_context(disable_onchange_name_predictive=True))._extend_with_attachments(l10n_it_attachments, new)
|
|
return result or super()._extend_with_attachments(attachments, new)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Business actions
|
|
# -------------------------------------------------------------------------
|
|
|
|
def action_l10n_it_edi_send(self):
|
|
""" Checks that the invoice data is coherent.
|
|
Attaches the XML file to the invoice.
|
|
Sends the invoice to the SdI.
|
|
"""
|
|
self.ensure_one()
|
|
|
|
if errors := self._l10n_it_edi_export_data_check():
|
|
messages = []
|
|
for error_key, error_data in errors.items():
|
|
message = error_data['message']
|
|
split = error_key.split("_")
|
|
if len(split) > 3 and (model_id := {
|
|
'partner': 'res.partner',
|
|
'move': 'account.move',
|
|
'company': 'res.company'
|
|
}.get(split[3], None)):
|
|
if action := error_data.get('action'):
|
|
if 'res_id' in action:
|
|
record_ids = [action['res_id']]
|
|
else:
|
|
record_ids = action['domain'][0][2]
|
|
records = self.env[model_id].browse(record_ids)
|
|
message = f"{message} - {', '.join(records.mapped('display_name'))}"
|
|
messages.append(nl2br(message))
|
|
|
|
# Update the vendor bill's header with the warning messages,
|
|
# and force reload the view to make sure the header is loaded
|
|
self.l10n_it_edi_header = Markup('<br/>').join(messages)
|
|
return {
|
|
'type': 'ir.actions.client',
|
|
'tag': 'reload',
|
|
}
|
|
|
|
attachment_vals = self._l10n_it_edi_get_attachment_values(pdf_values=None)
|
|
self.env['ir.attachment'].create(attachment_vals)
|
|
self.invalidate_recordset(fnames=['l10n_it_edi_attachment_id', 'l10n_it_edi_attachment_file'])
|
|
self.message_post(attachment_ids=self.l10n_it_edi_attachment_id.ids)
|
|
self._l10n_it_edi_send({self: attachment_vals})
|
|
self.is_move_sent = True
|
|
|
|
def action_check_l10n_it_edi(self):
|
|
self.ensure_one()
|
|
if not self.l10n_it_edi_transaction and self.l10n_it_edi_state not in WAITING_STATES:
|
|
raise UserError(_("This move is not waiting for updates from the SdI."))
|
|
if self.l10n_it_edi_state == 'being_sent':
|
|
return {'type': 'ir.actions.client', 'tag': 'reload'}
|
|
self._l10n_it_edi_update_send_state()
|
|
|
|
def button_draft(self):
|
|
# EXTENDS 'account'
|
|
for move in self:
|
|
move.l10n_it_edi_state = False
|
|
return super().button_draft()
|
|
|
|
# -------------------------------------------------------------------------
|
|
# Helpers
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _l10n_it_edi_ready_for_xml_export(self):
|
|
self.ensure_one()
|
|
return (
|
|
self.state == 'posted'
|
|
and self.company_id.account_fiscal_country_id.code == 'IT'
|
|
and self.journal_id.type == 'sale'
|
|
and self.l10n_it_edi_state in (False, 'rejected')
|
|
)
|
|
|
|
def _l10n_it_edi_add_base_lines_xml_values(self, base_lines_aggregated_values, is_downpayment):
|
|
self.ensure_one()
|
|
quantita_pd = min(self.env['account.move.line']._fields['quantity'].get_digits(self.env)[1], 8)
|
|
for index, (base_line, aggregated_values) in enumerate(base_lines_aggregated_values, start=1):
|
|
line = base_line['record']
|
|
tax_details = base_line['tax_details']
|
|
discount = base_line['discount']
|
|
quantity = base_line['quantity']
|
|
price_subtotal = base_line['price_subtotal'] = tax_details['raw_total_excluded_currency']
|
|
it_values = base_line['it_values'] = {}
|
|
|
|
# Description.
|
|
# Down payment lines:
|
|
# If there was a down paid amount that has been deducted from this move,
|
|
# we need to put a reference to the down payment invoice in the DatiFattureCollegate tag
|
|
description = line.name
|
|
if not is_downpayment and price_subtotal < 0:
|
|
downpayment_moves = line._get_downpayment_lines().move_id
|
|
if downpayment_moves:
|
|
downpayment_moves_description = ', '.join(downpayment_moves.mapped('name'))
|
|
sep = ', ' if description else ''
|
|
description = f"{description}{sep}{downpayment_moves_description}"
|
|
description = description or "NO NAME"
|
|
|
|
# Price unit.
|
|
if quantity:
|
|
it_values['prezzo_unitario'] = base_line['gross_price_subtotal'] / quantity
|
|
else:
|
|
it_values['prezzo_unitario'] = 0.0
|
|
|
|
# Discount.
|
|
discount_list = it_values['sconto_maggiorazione_list'] = []
|
|
delta_discount = base_line.get('discount_amount', 0.0) - base_line.get('discount_amount_before_dispatching', 0.0)
|
|
if discount:
|
|
discount_list.append({
|
|
'tipo': 'SC' if discount > 0 else 'MG',
|
|
'percentuale': abs(discount),
|
|
'importo': None,
|
|
})
|
|
if not base_line['currency_id'].is_zero(delta_discount):
|
|
discount_list.append({
|
|
'tipo': 'SC',
|
|
'percentuale': None,
|
|
'importo': abs(delta_discount / (quantity or 1.0)),
|
|
})
|
|
|
|
# Tax rates.
|
|
rates = it_values['aliquota_iva_list'] = []
|
|
for values in aggregated_values.values():
|
|
grouping_key = values['grouping_key']
|
|
if not grouping_key or grouping_key['skip']:
|
|
continue
|
|
|
|
rates.append(grouping_key['tax_amount_field'] if grouping_key['tax_amount_type_field'] == 'percent' else 0.0)
|
|
|
|
# Tax exempt reason.
|
|
vat_tax = base_line['tax_ids'].flatten_taxes_hierarchy().filtered(lambda t: t._l10n_it_filter_kind('vat') and t.amount >= 0)[:1]
|
|
it_values['natura'] = vat_tax.l10n_it_exempt_reason or None
|
|
|
|
# Other data.
|
|
other_data_list = it_values['altri_dati_gestionali_list'] = []
|
|
if base_line['currency_id'] != self.company_currency_id:
|
|
other_data_list.extend([
|
|
{
|
|
'tipo_dato': 'DIVISA',
|
|
'riferimento_testo': base_line['currency_id'].name,
|
|
'riferimento_numero': tax_details['raw_total_excluded_currency'],
|
|
'riferimento_data': None,
|
|
},
|
|
{
|
|
'tipo_dato': 'CAMBIO',
|
|
'riferimento_testo': None,
|
|
'riferimento_numero': base_line['rate'],
|
|
'riferimento_data': self.invoice_date,
|
|
},
|
|
])
|
|
|
|
it_values.update({
|
|
'numero_linea': index,
|
|
'descrizione': description,
|
|
'prezzo_totale': tax_details['raw_total_excluded'],
|
|
'quantita': quantity,
|
|
'quantita_pd': quantita_pd,
|
|
'ritenuta': None,
|
|
})
|
|
|
|
def _l10n_it_edi_get_tax_lines_xml_values(self, base_lines_aggregated_values, values_per_grouping_key):
|
|
self.ensure_one()
|
|
tax_lines = []
|
|
for values in values_per_grouping_key.values():
|
|
grouping_key = values['grouping_key']
|
|
if not grouping_key or grouping_key['skip']:
|
|
continue
|
|
|
|
rounding = values['base_amount']
|
|
for _base_line, aggregated_values in base_lines_aggregated_values:
|
|
if grouping_key in aggregated_values:
|
|
rounding -= aggregated_values[grouping_key]['raw_base_amount']
|
|
if float_is_zero(rounding, precision_digits=8):
|
|
rounding = None
|
|
|
|
tax_lines.append({
|
|
'aliquota_iva': grouping_key['tax_amount_field'],
|
|
'natura': grouping_key['l10n_it_exempt_reason'],
|
|
'arrotondamento': rounding,
|
|
'imponibile_importo': values['base_amount'],
|
|
'imposta': values['tax_amount'],
|
|
'esigibilita_iva': grouping_key['tax_exigibility_code'],
|
|
'riferimento_normativo': grouping_key['l10n_it_law_reference'],
|
|
})
|
|
return tax_lines
|
|
|
|
@api.model
|
|
def _l10n_it_edi_is_neg_split_payment(self, tax_data):
|
|
tax = tax_data['tax']
|
|
return (
|
|
tax.amount < 0.0
|
|
and tax_data['group']
|
|
and any(child_tax._l10n_it_is_split_payment() for child_tax in tax_data['group'].children_tax_ids)
|
|
)
|
|
|
|
@api.model
|
|
def _l10n_it_edi_grouping_function_base_lines(self, base_line, tax_data):
|
|
tax = tax_data['tax']
|
|
return {
|
|
'tax_amount_field': -23.0 if tax.amount == -11.5 else tax.amount,
|
|
'tax_amount_type_field': tax.amount_type,
|
|
'skip': tax_data['is_reverse_charge'] or self._l10n_it_edi_is_neg_split_payment(tax_data),
|
|
}
|
|
|
|
@api.model
|
|
def _l10n_it_edi_grouping_function_tax_lines(self, base_line, tax_data):
|
|
tax = tax_data['tax']
|
|
|
|
if tax._l10n_it_is_split_payment():
|
|
tax_exigibility_code = 'S'
|
|
elif tax.tax_exigibility == 'on_payment':
|
|
tax_exigibility_code = 'D'
|
|
elif tax.tax_exigibility == 'on_invoice':
|
|
tax_exigibility_code = 'I'
|
|
else:
|
|
tax_exigibility_code = None
|
|
|
|
return {
|
|
'tax_amount_field': -23.0 if tax.amount == -11.5 else tax.amount,
|
|
'l10n_it_exempt_reason': tax.l10n_it_exempt_reason,
|
|
'l10n_it_law_reference': tax.l10n_it_law_reference,
|
|
'tax_exigibility_code': tax_exigibility_code,
|
|
'tax_amount_type_field': tax.amount_type,
|
|
'skip': tax_data['is_reverse_charge'] or self._l10n_it_edi_is_neg_split_payment(tax_data),
|
|
}
|
|
|
|
@api.model
|
|
def _l10n_it_edi_grouping_function_total(self, base_line, tax_data):
|
|
skip = tax_data['is_reverse_charge'] or self._l10n_it_edi_is_neg_split_payment(tax_data)
|
|
return not skip
|
|
|
|
def _l10n_it_edi_get_values(self, pdf_values=None):
|
|
self.ensure_one()
|
|
|
|
# Flags
|
|
is_self_invoice = self.l10n_it_edi_is_self_invoice
|
|
document_type = self._l10n_it_edi_get_document_type()
|
|
|
|
# Represent if the document is a reverse charge refund in a single variable
|
|
reverse_charge = document_type in ['TD16', 'TD17', 'TD18', 'TD19']
|
|
is_downpayment = document_type in ['TD02']
|
|
reverse_charge_refund = self.move_type == 'in_refund' and reverse_charge
|
|
convert_to_euros = self.currency_id.name != 'EUR'
|
|
|
|
# Base lines.
|
|
base_amls = self.line_ids.filtered(lambda x: x.display_type == 'product')
|
|
base_lines = [self._prepare_product_base_line_for_taxes_computation(x) for x in base_amls]
|
|
tax_amls = self.line_ids.filtered(lambda x: x.display_type == 'tax')
|
|
tax_lines = [self._prepare_tax_line_for_taxes_computation(x) for x in tax_amls]
|
|
|
|
if reverse_charge_refund:
|
|
for base_line in base_lines:
|
|
base_line['price_unit'] *= -1
|
|
|
|
AccountTax = self.env['account.tax']
|
|
AccountTax._add_tax_details_in_base_lines(base_lines, self.company_id)
|
|
|
|
downpayment_lines = []
|
|
# Prepare for '_dispatch_negative_lines'
|
|
for base_line in base_lines:
|
|
tax_details = base_line['tax_details']
|
|
discount = base_line['discount']
|
|
price_unit = base_line['price_unit']
|
|
quantity = base_line['quantity']
|
|
price_subtotal = base_line['price_subtotal'] = tax_details['raw_total_excluded_currency']
|
|
|
|
if discount == 100.0:
|
|
gross_price_subtotal_before_discount = price_unit * quantity
|
|
else:
|
|
gross_price_subtotal_before_discount = price_subtotal / (1 - discount / 100.0)
|
|
|
|
base_line['gross_price_subtotal'] = gross_price_subtotal_before_discount
|
|
base_line['discount_amount_before_dispatching'] = gross_price_subtotal_before_discount - price_subtotal
|
|
|
|
# The tax "23% Ritenuta Agenti e Rappresentanti" is not supported because it's supposed to be a tax of 23% based on
|
|
# 50% of the base amount. It's currently implemented as a -11.5% tax. So on 1000, it gives an amount of -115.
|
|
# We need to fix the base amount from 1000 to 500.0.
|
|
for tax_data in tax_details['taxes_data']:
|
|
tax = tax_data['tax']
|
|
tax_data['_tax_amount'] = tax.amount
|
|
if tax.amount == -11.5:
|
|
tax_data['_tax_amount'] = -23.0
|
|
tax_data['raw_base_amount'] *= 0.5
|
|
tax_data['raw_base_amount_currency'] *= 0.5
|
|
|
|
if not is_downpayment:
|
|
# Negative lines linked to down payment should stay negative
|
|
line = base_line['record']
|
|
if line.price_subtotal < 0 and line._get_downpayment_lines():
|
|
downpayment_lines.append(base_line)
|
|
base_lines.remove(base_line)
|
|
|
|
if float_compare(quantity, 0, 2) < 0:
|
|
# Negative quantity is refused by SDI, so we invert quantity and price_unit to keep the price_subtotal
|
|
base_line.update({
|
|
'quantity': -quantity,
|
|
'price_unit': -price_unit,
|
|
})
|
|
|
|
dispatched_results = self.env['account.tax']._dispatch_negative_lines(base_lines)
|
|
base_lines = dispatched_results['result_lines'] + dispatched_results['orphan_negative_lines'] + downpayment_lines
|
|
AccountTax._round_base_lines_tax_details(base_lines, self.company_id, tax_lines=tax_lines)
|
|
base_lines_aggregated_values = AccountTax._aggregate_base_lines_tax_details(base_lines, self._l10n_it_edi_grouping_function_base_lines)
|
|
self._l10n_it_edi_add_base_lines_xml_values(base_lines_aggregated_values, is_downpayment)
|
|
base_lines = sorted(base_lines, key=lambda base_line: base_line['it_values']['numero_linea'])
|
|
|
|
# Tax lines.
|
|
base_lines_aggregated_values = AccountTax._aggregate_base_lines_tax_details(base_lines, self._l10n_it_edi_grouping_function_tax_lines)
|
|
values_per_grouping_key = AccountTax._aggregate_base_lines_aggregated_values(base_lines_aggregated_values)
|
|
tax_lines = self._l10n_it_edi_get_tax_lines_xml_values(base_lines_aggregated_values, values_per_grouping_key)
|
|
|
|
# Total of the document.
|
|
base_lines_aggregated_values = AccountTax._aggregate_base_lines_tax_details(base_lines, self._l10n_it_edi_grouping_function_total)
|
|
values_per_grouping_key = AccountTax._aggregate_base_lines_aggregated_values(base_lines_aggregated_values)
|
|
importo_totale_documento = 0.0
|
|
for values in values_per_grouping_key.values():
|
|
grouping_key = values['grouping_key']
|
|
if grouping_key is False:
|
|
continue
|
|
importo_totale_documento += values['base_amount_currency']
|
|
importo_totale_documento += values['tax_amount_currency']
|
|
|
|
company = self.company_id
|
|
partner = self.commercial_partner_id
|
|
sender = company
|
|
buyer = partner if not is_self_invoice else company
|
|
seller = company if not is_self_invoice else partner
|
|
sender_info_values = company.partner_id._l10n_it_edi_get_values()
|
|
buyer_info_values = (partner if not is_self_invoice else company.partner_id)._l10n_it_edi_get_values()
|
|
seller_info_values = (company.partner_id if not is_self_invoice else partner)._l10n_it_edi_get_values()
|
|
representative_info_values = company.l10n_it_tax_representative_partner_id._l10n_it_edi_get_values()
|
|
|
|
if self._l10n_it_edi_is_simplified_document_type(document_type):
|
|
formato_trasmissione = "FSM10"
|
|
elif partner._l10n_it_edi_is_public_administration():
|
|
formato_trasmissione = "FPA12"
|
|
else:
|
|
formato_trasmissione = "FPR12"
|
|
|
|
# Reference line for finding the conversion rate used in the document
|
|
conversion_rate = float_repr(
|
|
abs(self.amount_total / self.amount_total_signed), precision_digits=5,
|
|
) if convert_to_euros and self.invoice_line_ids else None
|
|
|
|
# Reduce downpayment views to a single recordset
|
|
downpayment_moves = self.invoice_line_ids._get_downpayment_lines().move_id
|
|
|
|
return {
|
|
'record': self,
|
|
'base_lines': base_lines,
|
|
'tax_lines': tax_lines,
|
|
'importo_totale_documento': importo_totale_documento,
|
|
'company': company,
|
|
'partner': partner,
|
|
'sender': sender,
|
|
'buyer': buyer,
|
|
'seller': seller,
|
|
'representative': company.l10n_it_tax_representative_partner_id,
|
|
'sender_info': sender_info_values,
|
|
'buyer_info': buyer_info_values,
|
|
'seller_info': seller_info_values,
|
|
'representative_info': representative_info_values,
|
|
'origin_document_type': self.l10n_it_origin_document_type,
|
|
'origin_document_name': self.l10n_it_origin_document_name,
|
|
'origin_document_date': self.l10n_it_origin_document_date,
|
|
'cig': self.l10n_it_cig,
|
|
'cup': self.l10n_it_cup,
|
|
'currency': self.currency_id or self.company_currency_id if not convert_to_euros else self.env.ref('base.EUR'),
|
|
'regime_fiscale': company.l10n_it_tax_system if not is_self_invoice else 'RF18',
|
|
'is_self_invoice': is_self_invoice,
|
|
'partner_bank': self.partner_bank_id,
|
|
'formato_trasmissione': formato_trasmissione,
|
|
'document_type': document_type,
|
|
'payment_method': 'MP05',
|
|
'downpayment_moves': downpayment_moves,
|
|
'reconciled_moves': self._get_reconciled_invoices(),
|
|
'rc_refund': reverse_charge_refund,
|
|
'conversion_rate': conversion_rate,
|
|
'balance_multiplicator': -1 if self.is_inbound() else 1,
|
|
'abs': abs,
|
|
'pdf_name': pdf_values['name'] if pdf_values else False,
|
|
'pdf': b64encode(pdf_values['raw']).decode() if pdf_values else False,
|
|
}
|
|
|
|
def _l10n_it_edi_services_or_goods(self):
|
|
"""
|
|
Services and goods have different tax grids when VAT is Reverse Charged, and they can't
|
|
be mixed in the same invoice, because the TipoDocumento depends on which which kind
|
|
of product is bought and it's unambiguous.
|
|
"""
|
|
self.ensure_one()
|
|
scopes = []
|
|
for line in self.invoice_line_ids.filtered(lambda l: l.display_type not in ('line_note', 'line_section')):
|
|
tax_ids_with_tax_scope = line.tax_ids.filtered(lambda x: x.tax_scope)
|
|
if tax_ids_with_tax_scope:
|
|
scopes += tax_ids_with_tax_scope.mapped('tax_scope')
|
|
else:
|
|
scopes.append(line.product_id and line.product_id.type == 'service' and 'service' or 'consu')
|
|
|
|
if set(scopes) == {'consu', 'service'}:
|
|
return "both"
|
|
return scopes and scopes.pop()
|
|
|
|
def _l10n_it_edi_goods_in_italy(self):
|
|
"""
|
|
There is a specific TipoDocumento (Document Type TD19) and tax grid (VJ3) for goods
|
|
that are phisically in Italy but are in a VAT deposit, meaning that the goods
|
|
have not passed customs.
|
|
"""
|
|
self.ensure_one()
|
|
invoice_lines_tags = self.line_ids.tax_tag_ids
|
|
it_tax_report_vj3_lines = self.env['account.report.line'].search([
|
|
('report_id.country_id.code', '=', 'IT'),
|
|
('code', '=', 'VJ3'),
|
|
])
|
|
vj3_lines_tags = it_tax_report_vj3_lines.expression_ids._get_matching_tags()
|
|
return bool(invoice_lines_tags & vj3_lines_tags)
|
|
|
|
def _l10n_it_edi_is_simplified(self):
|
|
"""
|
|
Simplified Invoices are a way for the invoice issuer to create an invoice with limited data.
|
|
Example: a consultant goes to the restaurant and wants the invoice instead of the receipt,
|
|
to be able to deduct the expense from his Taxes. The Italian State allows the restaurant
|
|
to issue a Simplified Invoice with the VAT number only, to speed up times, instead of
|
|
requiring the address and other informations about the buyer.
|
|
Only invoices under the threshold of 400 Euroes are allowed, to avoid this tool
|
|
be abused for bigger transactions, that would enable less transparency to tax institutions.
|
|
"""
|
|
self.ensure_one()
|
|
template_reference = self.env.ref('l10n_it_edi.account_invoice_it_simplified_FatturaPA_export', raise_if_not_found=False)
|
|
buyer = self.commercial_partner_id
|
|
checks = ['partner_address_missing', 'partner_vat_codice_fiscale_missing']
|
|
return bool(
|
|
template_reference
|
|
and not self.l10n_it_edi_is_self_invoice
|
|
and list(buyer._l10n_it_edi_export_check(checks).keys()) == ['l10n_it_edi_partner_address_missing']
|
|
and (not buyer.country_id or buyer.country_id.code == 'IT')
|
|
and (buyer.l10n_it_codice_fiscale or (buyer.vat and (buyer.vat[:2].upper() == 'IT' or buyer.vat[:2].isdecimal())))
|
|
and self.amount_total <= 400
|
|
)
|
|
|
|
def _l10n_it_edi_is_professional_fees(self):
|
|
"""
|
|
This function returns a boolean value based on the comparison of the lines values with a product.
|
|
If one line has the tag for professional fee then we return True
|
|
"""
|
|
self.ensure_one()
|
|
professional_fee_tag = self.env.ref('l10n_it_edi.l10n_it_edi_professional_fees_tag', raise_if_not_found=False)
|
|
if not professional_fee_tag:
|
|
return False
|
|
|
|
return any(
|
|
professional_fee_tag.id in line.account_id.tag_ids.ids
|
|
for line in self.invoice_line_ids
|
|
if line.display_type not in ('line_note', 'line_section')
|
|
)
|
|
|
|
def _l10n_it_edi_features_for_document_type_selection(self):
|
|
""" Returns a dictionary of features to be compared with the TDxx FatturaPA
|
|
document type requirements. """
|
|
partner_values = self.commercial_partner_id._l10n_it_edi_get_values()
|
|
services_or_goods = self._l10n_it_edi_services_or_goods()
|
|
return {
|
|
'move_types': self.move_type,
|
|
'partner_in_eu': partner_values.get('in_eu', False),
|
|
'partner_country_code': partner_values.get('country_code', False),
|
|
'simplified': self._l10n_it_edi_is_simplified(),
|
|
'self_invoice': self.l10n_it_edi_is_self_invoice,
|
|
'tax_tags': {tag for tag in self.line_ids.tax_tag_ids.mapped(lambda x: (x.name or '').upper().replace("+", "").replace("-", "")) if tag},
|
|
'downpayment': self._is_downpayment(),
|
|
'services_or_goods': services_or_goods,
|
|
'goods_in_italy': services_or_goods == 'consu' and self._l10n_it_edi_goods_in_italy(),
|
|
'professional_fees': self._l10n_it_edi_is_professional_fees(),
|
|
}
|
|
|
|
def _l10n_it_edi_document_type_mapping(self):
|
|
""" Returns a dictionary with the required features for every TDxx FatturaPA document type """
|
|
return {
|
|
'TD01': {'move_types': ['out_invoice'],
|
|
'import_type': 'in_invoice',
|
|
'self_invoice': False,
|
|
'simplified': False,
|
|
'downpayment': False,
|
|
'professional_fees': False},
|
|
'TD02': {'move_types': ['out_invoice'],
|
|
'import_type': 'in_invoice',
|
|
'self_invoice': False,
|
|
'simplified': False,
|
|
'downpayment': True,
|
|
'professional_fees': False},
|
|
'TD03': {'move_types': ['out_invoice'],
|
|
'import_type': 'in_invoice',
|
|
'self_invoice': False,
|
|
'simplified': False,
|
|
'downpayment': True,
|
|
'professional_fees': True},
|
|
'TD04': {'move_types': ['out_refund'],
|
|
'import_type': 'in_refund',
|
|
'self_invoice': False,
|
|
'simplified': False},
|
|
'TD05': {'move_types': ['out_refund'],
|
|
'import_type': 'in_refund',
|
|
'self_invoice': False,
|
|
'simplified': False},
|
|
'TD06': {'move_types': ['out_invoice'],
|
|
'import_type': 'in_invoice',
|
|
'self_invoice': False,
|
|
'simplified': False,
|
|
'downpayment': False,
|
|
'professional_fees': True},
|
|
'TD07': {'move_types': ['out_invoice'],
|
|
'import_type': 'in_invoice',
|
|
'self_invoice': False,
|
|
'simplified': True},
|
|
'TD08': {'move_types': ['out_refund'],
|
|
'import_type': 'in_refund',
|
|
'self_invoice': False,
|
|
'simplified': True},
|
|
'TD09': {'move_types': ['out_invoice'],
|
|
'import_type': 'in_invoice',
|
|
'self_invoice': False,
|
|
'simplified': True},
|
|
'TD28': {'move_types': ['in_invoice', 'in_refund'],
|
|
'import_type': 'in_invoice',
|
|
'simplified': False,
|
|
'self_invoice': True,
|
|
'partner_country_code': "SM"},
|
|
'TD16': {'move_types': ['in_invoice', 'in_refund'],
|
|
'import_type': 'in_invoice',
|
|
'simplified': False,
|
|
'self_invoice': True,
|
|
'tax_tags': {'VJ6', 'VJ7', 'VJ8', 'VJ12', 'VJ13', 'VJ14', 'VJ15', 'VJ16', 'VJ17'}},
|
|
'TD17': {'move_types': ['in_invoice', 'in_refund'],
|
|
'import_type': 'in_invoice',
|
|
'simplified': False,
|
|
'self_invoice': True,
|
|
'services_or_goods': "service",
|
|
'tax_tags': {'VJ3'}},
|
|
'TD18': {'move_types': ['in_invoice', 'in_refund'],
|
|
'import_type': 'in_invoice',
|
|
'simplified': False,
|
|
'self_invoice': True,
|
|
'services_or_goods': "consu",
|
|
'goods_in_italy': False,
|
|
'partner_in_eu': True,
|
|
'tax_tags': {'VJ9'}},
|
|
'TD19': {'move_types': ['in_invoice', 'in_refund'],
|
|
'import_type': 'in_invoice',
|
|
'simplified': False,
|
|
'self_invoice': True,
|
|
'services_or_goods': "consu",
|
|
'goods_in_italy': True,
|
|
'tax_tags': {'VJ3'}},
|
|
}
|
|
|
|
def _l10n_it_edi_get_document_type(self):
|
|
""" Compare the features of the invoice to the requirements of each Document Type (TDxx)
|
|
FatturaPA until you find a valid one. """
|
|
|
|
def compare(actual_values, expected_values):
|
|
""" Compare a single entry from the invoice features with the one of the document_type """
|
|
if isinstance(expected_values, set | list | tuple):
|
|
# i.e. When we compare actual tax_tags from the invoice with expected tags, we see if there is at least one in common
|
|
if isinstance(actual_values, set):
|
|
return actual_values & set(expected_values)
|
|
# i.e. When we compare the move_type with the available ones, these can be more than one
|
|
return actual_values in expected_values
|
|
# We compare other features directly, one on one
|
|
return actual_values == expected_values
|
|
|
|
invoice_features = self._l10n_it_edi_features_for_document_type_selection()
|
|
for document_type_code, document_type_features in self._l10n_it_edi_document_type_mapping().items():
|
|
# By using a generator instead of a list, we can avoid some comparisons
|
|
if all(compare(invoice_values, document_type_features[k]) for k, invoice_values in invoice_features.items() if k in document_type_features):
|
|
return document_type_code
|
|
return False
|
|
|
|
def _l10n_it_edi_is_simplified_document_type(self, document_type):
|
|
mapping = self._l10n_it_edi_document_type_mapping()
|
|
return mapping.get(document_type, {}).get('simplified', False)
|
|
|
|
@api.model
|
|
def _l10n_it_buyer_seller_info(self):
|
|
return {
|
|
'buyer': {
|
|
'role': 'buyer',
|
|
'section_xpath': '//CessionarioCommittente',
|
|
'vat_xpath': '//CessionarioCommittente//IdCodice',
|
|
'codice_fiscale_xpath': '//CessionarioCommittente//CodiceFiscale',
|
|
'type_tax_use_domain': [('type_tax_use', '=', 'purchase')],
|
|
},
|
|
'seller': {
|
|
'role': 'seller',
|
|
'section_xpath': '//CedentePrestatore',
|
|
'vat_xpath': '//CedentePrestatore//IdCodice',
|
|
'codice_fiscale_xpath': '//CedentePrestatore//CodiceFiscale',
|
|
'type_tax_use_domain': [('type_tax_use', '=', 'sale')],
|
|
},
|
|
}
|
|
|
|
# -------------------------------------------------------------------------
|
|
# EDI: Import
|
|
# -------------------------------------------------------------------------
|
|
|
|
def cron_l10n_it_edi_download_and_update(self):
|
|
""" Crons run with sudo(), with empty recordset. Remember that. """
|
|
retrigger = False
|
|
for proxy_user in self.env['account_edi_proxy_client.user'].search([('proxy_type', '=', 'l10n_it_edi')]):
|
|
proxy_user = proxy_user.with_company(proxy_user.company_id)
|
|
if proxy_user.edi_mode != 'demo':
|
|
moves_to_check = self.search([
|
|
('company_id', '=', proxy_user.company_id.id),
|
|
('l10n_it_edi_transaction', '!=', False),
|
|
('l10n_it_edi_state', 'in', WAITING_STATES)
|
|
])
|
|
if moves_to_check:
|
|
moves_to_check._l10n_it_edi_update_send_state()
|
|
retrigger = retrigger or self._l10n_it_edi_download_invoices(proxy_user)
|
|
|
|
# Retrigger download if there are still some on the server
|
|
if retrigger:
|
|
_logger.info('Retriggering "Receive invoices from the SdI"...')
|
|
self.env.ref('l10n_it_edi.ir_cron_l10n_it_edi_download_and_update')._trigger()
|
|
|
|
def _l10n_it_edi_download_invoices(self, proxy_user):
|
|
""" Check the proxy for incoming invoices for a specified proxy user.
|
|
:return: True if there remain some invoices on the server to be downloaded, False otherwise.
|
|
"""
|
|
server_url = proxy_user._get_server_url()
|
|
|
|
# Download invoices
|
|
invoices_data = {}
|
|
try:
|
|
invoices_data = proxy_user._make_request(f'{server_url}/api/l10n_it_edi/1/in/RicezioneInvoice',
|
|
params={'recipient_codice_fiscale': proxy_user.company_id.l10n_it_codice_fiscale})
|
|
except AccountEdiProxyError as e:
|
|
_logger.error('Error while receiving invoices from the SdI: %s', e)
|
|
return False
|
|
|
|
# Process the downloaded invoices
|
|
processed = self._l10n_it_edi_process_downloads(invoices_data, proxy_user)
|
|
if processed['proxy_acks']:
|
|
try:
|
|
proxy_user._make_request(
|
|
f'{server_url}/api/l10n_it_edi/1/ack',
|
|
params={'transaction_ids': processed['proxy_acks']})
|
|
except AccountEdiProxyError as e:
|
|
_logger.error('Error while receiving file from the SdI: %s', e)
|
|
|
|
return processed['retrigger']
|
|
|
|
def _l10n_it_edi_process_downloads(self, invoices_data, proxy_user):
|
|
""" Every attachment will be committed if stored succesfully.
|
|
Also moves will be committed one by one, even if imported incorrectly.
|
|
"""
|
|
proxy_acks = []
|
|
retrigger = False
|
|
moves = self.env['account.move']
|
|
|
|
for id_transaction, invoice_data in invoices_data.items():
|
|
|
|
# The IAP server has a maximum number of documents it can send.
|
|
# If that maximum is reached, then we search for more
|
|
# by re-triggering the download cron, avoiding the timeout.
|
|
current_num = invoice_data.get('current_num', 0)
|
|
max_num = invoice_data.get('max_num', 0)
|
|
retrigger = retrigger or current_num == max_num > 0
|
|
|
|
# `_l10n_it_edi_create_move_from_attachment` will create an empty move
|
|
# then try and fill it with the content imported from the attachment.
|
|
# Should the import fail, thanks to try..except and savepoint,
|
|
# we will anyway end up with an empty `in_invoice` with the attachment posted on it.
|
|
if move := self.with_company(proxy_user.company_id)._l10n_it_edi_create_move_with_attachment(
|
|
invoice_data['filename'],
|
|
invoice_data['file'],
|
|
invoice_data['key'],
|
|
proxy_user,
|
|
):
|
|
|
|
if not modules.module.current_test:
|
|
self.env.cr.commit()
|
|
moves |= move
|
|
proxy_acks.append(id_transaction)
|
|
|
|
# Extend created moves with the related attachments and commit
|
|
for move in moves:
|
|
move._extend_with_attachments(move.l10n_it_edi_attachment_id, new=True)
|
|
if not modules.module.current_test:
|
|
self.env.cr.commit()
|
|
|
|
return {"retrigger": retrigger, "proxy_acks": proxy_acks}
|
|
|
|
def _l10n_it_edi_create_move_with_attachment(self, filename, content, key, proxy_user):
|
|
""" Creates a move and save an incoming file from the SdI as its attachment.
|
|
|
|
:param filename: name of the file to be saved.
|
|
:param content: encrypted content of the file to be saved.
|
|
:param key: key to decrypt the file.
|
|
:param proxy_user: the AccountEdiProxyClientUser to use for decrypting the file
|
|
"""
|
|
|
|
# Name should be unique per company, the invoice already exists
|
|
Attachment = self.env['ir.attachment'].sudo().with_company(proxy_user.company_id)
|
|
if Attachment.search_count([
|
|
('name', '=', filename),
|
|
('res_model', '=', 'account.move'),
|
|
('res_field', '=', 'l10n_it_edi_attachment_file'),
|
|
('company_id', '=', proxy_user.company_id.id),
|
|
], limit=1):
|
|
_logger.warning('E-invoice already exists: %s', filename)
|
|
return False
|
|
|
|
# Decrypt with the server key
|
|
try:
|
|
decrypted_content = proxy_user._decrypt_data(content, key)
|
|
except Exception as e: # noqa: BLE001
|
|
_logger.warning("Cannot decrypt e-invoice: %s, %s", filename, e)
|
|
return False
|
|
|
|
# Create the attachment, an empty move, then attach the two and commit
|
|
move = self.with_company(proxy_user.company_id).create({})
|
|
attachment = Attachment.create({
|
|
'name': filename,
|
|
'raw': decrypted_content,
|
|
'type': 'binary',
|
|
'res_model': 'account.move',
|
|
'res_id': move.id,
|
|
'res_field': 'l10n_it_edi_attachment_file'
|
|
})
|
|
move.with_context(
|
|
account_predictive_bills_disable_prediction=True,
|
|
no_new_invoice=True,
|
|
).message_post(attachment_ids=attachment.ids)
|
|
|
|
return move
|
|
|
|
def _l10n_it_edi_search_partner(self, company, vat, codice_fiscale, email):
|
|
for domain in [vat and [('vat', 'ilike', vat)],
|
|
codice_fiscale and [('l10n_it_codice_fiscale', 'in', ('IT' + codice_fiscale, codice_fiscale))],
|
|
email and ['|', ('email', '=', email), ('l10n_it_pec_email', '=', email)]]:
|
|
if domain and (partner := self.env['res.partner'].search(
|
|
domain + self.env['res.partner']._check_company_domain(company), limit=1)):
|
|
return partner
|
|
return self.env['res.partner']
|
|
|
|
def _l10n_it_edi_search_tax_for_import(self, company, percentage, extra_domain=None, l10n_it_exempt_reason=None):
|
|
""" Returns the VAT, Withholding or Pension Fund tax that suits the conditions given
|
|
and matches the percentage found in the XML for the company. """
|
|
|
|
# The tax "23% Ritenuta Agenti e Rappresentanti" is not supported because it's supposed to be a tax of 23% based on
|
|
# 50% of the base amount. It's currently implemented as a -11.5% tax. So on 1000, it gives an amount of -115.
|
|
# We need to fix the base amount from 1000 to 500.0.
|
|
if percentage == -23.0:
|
|
percentage = -11.5
|
|
|
|
domain = [
|
|
*self.env['account.tax']._check_company_domain(company),
|
|
('amount_type', '=', 'percent'),
|
|
] + (extra_domain or [])
|
|
|
|
# We suppose we're importing a file that comes in as a customer invoice where the sale tax will be 0%.
|
|
# To retrieve the correct purchase tax, we examine the sale tax's l10n_it_exempt_reason.
|
|
# We determine whether the l10n_it_exempt_reason is specific to reverse charge.
|
|
reversed_tax_tag = self._l10n_it_edi_exempt_reason_tag_mapping().get(l10n_it_exempt_reason, '')
|
|
if not reversed_tax_tag:
|
|
# Normal VAT taxes have a known percentage and generally have all positive repartition lines
|
|
domain += [('amount', '=', percentage), ('l10n_it_exempt_reason', '=', l10n_it_exempt_reason)]
|
|
taxes = self.env['account.tax'].search(domain).filtered(
|
|
lambda tax: all(rep_line.factor_percent >= 0 for rep_line in tax.invoice_repartition_line_ids))
|
|
else:
|
|
# In case of reverse charge, the purchase tax has a negative repartition line.
|
|
domain += [('invoice_repartition_line_ids.tag_ids.name', '=', f'+{reversed_tax_tag.lower()}')]
|
|
taxes = self.env['account.tax'].search(domain, order="amount desc").filtered(
|
|
lambda tax: any(rep_line.factor_percent < 0 for rep_line in tax.invoice_repartition_line_ids))
|
|
|
|
return taxes[0] if taxes else taxes
|
|
|
|
def _l10n_it_edi_get_extra_info(self, company, document_type, body_tree, incoming=True):
|
|
""" This function is meant to collect other information that has to be inserted on the invoice lines by submodules.
|
|
:return extra_info, messages_to_log"""
|
|
return {
|
|
'simplified': self.env['account.move']._l10n_it_edi_is_simplified_document_type(document_type),
|
|
'type_tax_use_domain': [('type_tax_use', '=', 'purchase' if incoming else 'sale')],
|
|
}, []
|
|
|
|
def _l10n_it_edi_import_invoice(self, invoice, data, is_new):
|
|
""" Decodes a l10n_it_edi move into an Odoo move.
|
|
|
|
:param data: the dictionary with the content to be imported
|
|
keys: 'filename', 'content', 'xml_tree', 'type', 'sort_weight'
|
|
:param is_new: whether the move is newly created or to be updated
|
|
:returns: the imported move
|
|
"""
|
|
with self._get_edi_creation() as self:
|
|
buyer_seller_info = self._l10n_it_buyer_seller_info()
|
|
|
|
tree = data['xml_tree']
|
|
company = self.company_id
|
|
|
|
# There are 2 cases:
|
|
# - cron:
|
|
# * Move direction (incoming / outgoing) flexible (no 'default_move_type')
|
|
# * I.e. used for import from tax agency
|
|
# - "Upload" button (invoices / bills view)
|
|
# * Fixed move direction; the button sets the 'default_move_type'
|
|
default_move_type = self.env.context.get('default_move_type')
|
|
if default_move_type is None:
|
|
incoming_possibilities = [True, False]
|
|
elif default_move_type in invoice.get_purchase_types(include_receipts=True):
|
|
incoming_possibilities = [True]
|
|
elif default_move_type in invoice.get_sale_types(include_receipts=True):
|
|
incoming_possibilities = [False]
|
|
else:
|
|
_logger.warning("Cannot handle default_move_type '%s'.", default_move_type)
|
|
return
|
|
|
|
for incoming in incoming_possibilities:
|
|
company_role, partner_role = ('buyer', 'seller') if incoming else ('seller', 'buyer')
|
|
company_info = buyer_seller_info[company_role]
|
|
vat = get_text(tree, company_info['vat_xpath'])
|
|
if vat and vat .casefold() in (company.vat or '').casefold():
|
|
break
|
|
codice_fiscale = get_text(tree, company_info['codice_fiscale_xpath'])
|
|
if codice_fiscale and codice_fiscale.casefold() in (company.l10n_it_codice_fiscale or '').casefold():
|
|
break
|
|
else:
|
|
invoice.message_post(body=_("Your company's VAT number and Fiscal Code haven't been found in the buyer and/or seller sections inside the document."))
|
|
return
|
|
|
|
# For unsupported document types, just assume in_invoice, and log that the type is unsupported
|
|
document_type = get_text(tree, '//DatiGeneraliDocumento/TipoDocumento')
|
|
move_type = self._l10n_it_edi_document_type_mapping().get(document_type, {}).get('import_type')
|
|
if not move_type:
|
|
move_type = "in_invoice"
|
|
_logger.info('Document type not managed: %s. Invoice type is set by default.', document_type)
|
|
if not incoming and move_type.startswith('in_'):
|
|
move_type = 'out' + move_type[2:]
|
|
|
|
self.move_type = move_type
|
|
|
|
if self.name and self.name != '/':
|
|
# the journal might've changed, so we need to recompute the name in case it was set (first entry in journal)
|
|
self.name = False
|
|
self._compute_name()
|
|
|
|
# Collect extra info from the XML that may be used by submodules to further put information on the invoice lines
|
|
extra_info, message_to_log = self._l10n_it_edi_get_extra_info(company, document_type, tree, incoming=incoming)
|
|
|
|
# Partner
|
|
partner_info = buyer_seller_info[partner_role]
|
|
vat = get_text(tree, partner_info['vat_xpath'])
|
|
codice_fiscale = get_text(tree, partner_info['codice_fiscale_xpath'])
|
|
email = get_text(tree, '//DatiTrasmissione//Email') if partner_info['role'] == 'seller' else ''
|
|
if partner := self._l10n_it_edi_search_partner(company, vat, codice_fiscale, email):
|
|
self.partner_id = partner
|
|
else:
|
|
message = Markup("<br/>").join((
|
|
_("Partner not found, useful informations from XML file:"),
|
|
self._compose_info_message(tree, partner_info['section_xpath'])
|
|
))
|
|
message_to_log.append(message)
|
|
|
|
# Numbering attributed by the transmitter
|
|
if progressive_id := get_text(tree, '//ProgressivoInvio'):
|
|
self.payment_reference = progressive_id
|
|
|
|
# Document Number
|
|
if number := get_text(tree, './/DatiGeneraliDocumento//Numero'):
|
|
self.ref = number
|
|
|
|
# Currency
|
|
if currency_str := get_text(tree, './/DatiGeneraliDocumento/Divisa'):
|
|
currency = self.env.ref('base.%s' % currency_str.upper(), raise_if_not_found=False)
|
|
if currency != self.env.company.currency_id and currency.active:
|
|
self.currency_id = currency
|
|
|
|
# Date
|
|
if document_date := get_date(tree, './/DatiGeneraliDocumento/Data'):
|
|
self.invoice_date = document_date
|
|
else:
|
|
message_to_log.append(_("Document date invalid in XML file: %s", document_date))
|
|
|
|
# Stamp Duty
|
|
if stamp_duty := get_text(tree, './/DatiGeneraliDocumento/DatiBollo/ImportoBollo'):
|
|
self.l10n_it_stamp_duty = float(stamp_duty)
|
|
|
|
# Comment
|
|
for narration in get_text(tree, './/DatiGeneraliDocumento//Causale', many=True):
|
|
self.narration = '%s%s<br/>' % (self.narration or '', narration)
|
|
|
|
# Informations relative to the purchase order, the contract, the agreement,
|
|
# the reception phase or invoices previously transmitted
|
|
# <2.1.2> - <2.1.6>
|
|
for document_type in ['DatiOrdineAcquisto', 'DatiContratto', 'DatiConvenzione', 'DatiRicezione', 'DatiFattureCollegate']:
|
|
for element in tree.xpath('.//DatiGenerali/' + document_type):
|
|
message = Markup("{} {}<br/>{}").format(document_type, _("from XML file:"), self._compose_info_message(element, '.'))
|
|
message_to_log.append(message)
|
|
|
|
# Dati DDT. <2.1.8>
|
|
if elements := tree.xpath('.//DatiGenerali/DatiDDT'):
|
|
message = Markup("<br/>").join((
|
|
_("Transport informations from XML file:"),
|
|
self._compose_info_message(tree, './/DatiGenerali/DatiDDT')
|
|
))
|
|
message_to_log.append(message)
|
|
|
|
# Due date. <2.4.2.5>
|
|
if due_date := get_date(tree, './/DatiPagamento/DettaglioPagamento/DataScadenzaPagamento'):
|
|
self.invoice_date_due = fields.Date.to_string(due_date)
|
|
else:
|
|
message_to_log.append(_("Payment due date invalid in XML file: %s", str(due_date)))
|
|
|
|
# Information related to the purchase order <2.1.2>
|
|
if (po_refs := get_text(tree, '//DatiGenerali/DatiOrdineAcquisto/IdDocumento', many=True)):
|
|
self.invoice_origin = ", ".join(po_refs)
|
|
|
|
# Total amount. <2.4.2.6>
|
|
if amount_total := sum(float(x) for x in get_text(tree, './/ImportoPagamento', many=True) if x):
|
|
message_to_log.append(_("Total amount from the XML File: %s", amount_total))
|
|
|
|
# Bank account. <2.4.2.13>
|
|
if self.move_type not in ('out_invoice', 'in_refund'):
|
|
if acc_number := get_text(tree, './/DatiPagamento/DettaglioPagamento/IBAN'):
|
|
if self.partner_id and self.partner_id.commercial_partner_id:
|
|
bank = self.env['res.partner.bank'].search([
|
|
('acc_number', '=', acc_number),
|
|
('partner_id', '=', self.partner_id.commercial_partner_id.id),
|
|
('company_id', 'in', [self.company_id.id, False])
|
|
], order='company_id', limit=1)
|
|
else:
|
|
bank = self.env['res.partner.bank'].search([
|
|
('acc_number', '=', acc_number),
|
|
('company_id', 'in', [self.company_id.id, False])
|
|
], order='company_id', limit=1)
|
|
if bank:
|
|
self.partner_bank_id = bank
|
|
else:
|
|
message = Markup("<br/>").join((
|
|
_("Bank account not found, useful informations from XML file:"),
|
|
self._compose_info_message(tree, [
|
|
'.//DatiPagamento//Beneficiario',
|
|
'.//DatiPagamento//IstitutoFinanziario',
|
|
'.//DatiPagamento//IBAN',
|
|
'.//DatiPagamento//ABI',
|
|
'.//DatiPagamento//CAB',
|
|
'.//DatiPagamento//BIC',
|
|
'.//DatiPagamento//ModalitaPagamento'
|
|
])
|
|
))
|
|
message_to_log.append(message)
|
|
elif elements := tree.xpath('.//DatiPagamento/DettaglioPagamento'):
|
|
message = Markup("<br/>").join((
|
|
_("Bank account not found, useful informations from XML file:"),
|
|
self._compose_info_message(tree, './/DatiPagamento')
|
|
))
|
|
message_to_log.append(message)
|
|
|
|
# Invoice lines. <2.2.1>
|
|
tag_name = './/DettaglioLinee' if not extra_info['simplified'] else './/DatiBeniServizi'
|
|
for element in tree.xpath(tag_name):
|
|
move_line = self.invoice_line_ids.create({
|
|
'move_id': self.id,
|
|
'tax_ids': [fields.Command.clear()]})
|
|
if move_line:
|
|
message_to_log += self._l10n_it_edi_import_line(element, move_line, extra_info)
|
|
|
|
# Global discount summarized in 1 amount
|
|
if discount_elements := tree.xpath('.//DatiGeneraliDocumento/ScontoMaggiorazione'):
|
|
taxable_amount = float(self.tax_totals['base_amount_currency'])
|
|
discounted_amount = taxable_amount
|
|
for discount_element in discount_elements:
|
|
discount_sign = 1
|
|
if (discount_type := discount_element.xpath('.//Tipo')) and discount_type[0].text == 'MG':
|
|
discount_sign = -1
|
|
if discount_amount := get_text(discount_element, './/Importo'):
|
|
discounted_amount -= discount_sign * float(discount_amount)
|
|
continue
|
|
if discount_percentage := get_text(discount_element, './/Percentuale'):
|
|
discounted_amount *= 1 - discount_sign * float(discount_percentage) / 100
|
|
|
|
general_discount = discounted_amount - taxable_amount
|
|
sequence = len(elements) + 1
|
|
|
|
self.invoice_line_ids = [Command.create({
|
|
'sequence': sequence,
|
|
'name': 'SCONTO' if general_discount < 0 else 'MAGGIORAZIONE',
|
|
'price_unit': general_discount,
|
|
})]
|
|
|
|
for element in tree.xpath('.//Allegati'):
|
|
attachment_64 = self.env['ir.attachment'].create({
|
|
'name': get_text(element, './/NomeAttachment'),
|
|
'datas': str.encode(get_text(element, './/Attachment')),
|
|
'type': 'binary',
|
|
'res_model': 'account.move',
|
|
'res_id': self.id,
|
|
})
|
|
|
|
# no_new_invoice to prevent from looping on the.message_post that would create a new invoice without it
|
|
self.with_context(no_new_invoice=True).sudo().message_post(
|
|
body=(_("Attachment from XML")),
|
|
attachment_ids=[attachment_64.id],
|
|
)
|
|
|
|
for message in message_to_log:
|
|
self.sudo().message_post(body=message)
|
|
return self
|
|
|
|
@api.model
|
|
def _is_prediction_enabled(self):
|
|
return self.env['ir.module.module'].search([('name', '=', 'account_accountant'), ('state', '=', 'installed')])
|
|
|
|
def _l10n_it_edi_import_line(self, element, move_line, extra_info=None):
|
|
extra_info = extra_info or {}
|
|
company = move_line.company_id
|
|
partner = move_line.partner_id
|
|
message_to_log = []
|
|
predict_enabled = self._is_prediction_enabled()
|
|
|
|
# Sequence.
|
|
line_elements = element.xpath('.//NumeroLinea')
|
|
if line_elements:
|
|
move_line.sequence = int(line_elements[0].text)
|
|
|
|
# Name.
|
|
move_line.name = " ".join(get_text(element, './/Descrizione').split())
|
|
|
|
# Product.
|
|
if elements_code := element.xpath('.//CodiceArticolo'):
|
|
for element_code in elements_code:
|
|
type_code = element_code.xpath('.//CodiceTipo')[0]
|
|
code = element_code.xpath('.//CodiceValore')[0]
|
|
product = self.env['product.product'].search([('barcode', '=', code.text)])
|
|
if (product and type_code.text == 'EAN'):
|
|
move_line.product_id = product
|
|
break
|
|
if partner:
|
|
product_supplier = self.env['product.supplierinfo'].search([('partner_id', '=', partner.id), ('product_code', '=', code.text)], limit=2)
|
|
if product_supplier and len(product_supplier) == 1 and product_supplier.product_id:
|
|
move_line.product_id = product_supplier.product_id
|
|
break
|
|
if not move_line.product_id:
|
|
for element_code in elements_code:
|
|
code = element_code.xpath('.//CodiceValore')[0]
|
|
product = self.env['product.product'].search([('default_code', '=', code.text)], limit=2)
|
|
if product and len(product) == 1:
|
|
move_line.product_id = product
|
|
break
|
|
|
|
# If no product is found, try to find a product that may be fitting
|
|
if predict_enabled and not move_line.product_id:
|
|
fitting_product = move_line._predict_product()
|
|
if fitting_product:
|
|
name = move_line.name
|
|
move_line.product_id = fitting_product
|
|
move_line.name = name
|
|
|
|
if predict_enabled:
|
|
# Fitting account for the line
|
|
fitting_account = move_line._predict_account()
|
|
if fitting_account:
|
|
move_line.account_id = fitting_account
|
|
|
|
# Quantity.
|
|
move_line.quantity = float(get_text(element, './/Quantita') or '1')
|
|
|
|
# Taxes
|
|
percentage = None
|
|
if not extra_info['simplified']:
|
|
percentage = get_float(element, './/AliquotaIVA')
|
|
if price_unit := get_float(element, './/PrezzoUnitario'):
|
|
move_line.price_unit = price_unit
|
|
elif amount := get_float(element, './/Importo'):
|
|
percentage = get_float(element, './/Aliquota')
|
|
if not percentage and (tax_amount := get_float(element, './/Imposta')):
|
|
percentage = round(tax_amount / (amount - tax_amount) * 100)
|
|
move_line.price_unit = amount / (1 + percentage / 100)
|
|
|
|
move_line.tax_ids = []
|
|
if percentage is not None:
|
|
l10n_it_exempt_reason = get_text(element, './/Natura').upper() or False
|
|
extra_domain = extra_info.get('type_tax_use_domain', [('type_tax_use', '=', 'purchase')])
|
|
if tax := self._l10n_it_edi_search_tax_for_import(company, percentage, extra_domain, l10n_it_exempt_reason=l10n_it_exempt_reason):
|
|
move_line.tax_ids += tax
|
|
else:
|
|
message = Markup("<br/>").join((
|
|
_("Tax not found for line with description '%s'", move_line.name),
|
|
self._compose_info_message(element, '.')
|
|
))
|
|
message_to_log.append(message)
|
|
|
|
# If no taxes were found, try to find taxes that may be fitting
|
|
if predict_enabled and not move_line.tax_ids:
|
|
fitting_taxes = move_line._predict_taxes()
|
|
if fitting_taxes:
|
|
move_line.tax_ids = [Command.set(fitting_taxes)]
|
|
|
|
# Discounts
|
|
if elements := element.xpath('.//ScontoMaggiorazione'):
|
|
# Special case of only 1 percentage discount
|
|
if len(elements) == 1:
|
|
element = elements[0]
|
|
if discount_percentage := get_float(element, './/Percentuale'):
|
|
discount_type = get_text(element, './/Tipo')
|
|
discount_sign = -1 if discount_type == 'MG' else 1
|
|
move_line.discount = discount_sign * discount_percentage
|
|
# Discounts in cascade summarized in 1 percentage
|
|
else:
|
|
total = get_float(element, './/PrezzoTotale')
|
|
discount = 100 - (100 * total) / (move_line.quantity * move_line.price_unit)
|
|
move_line.discount = discount
|
|
|
|
return message_to_log
|
|
|
|
def _l10n_it_edi_format_errors(self, header, errors):
|
|
return Markup('{}<ul class="mb-0">{}</ul>').format(
|
|
nl2br_enclose(header, 'span') if header else '',
|
|
Markup().join(nl2br_enclose(' '.join(error.split()), 'li') for error in errors)
|
|
)
|
|
|
|
def _compose_info_message(self, tree, tags):
|
|
result = ""
|
|
for tag in tags if isinstance(tags, list) else [tags]:
|
|
for el in tree.xpath(tag):
|
|
result += self._l10n_it_edi_format_errors("", [f'{subel.tag}: {subel.text}' for subel in el.iter()])
|
|
return result
|
|
|
|
# -------------------------------------------------------------------------
|
|
# EDI: Export
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _l10n_it_edi_export_data_check(self):
|
|
""" This function checks the Settings, Company, Partners, Moves involved in the
|
|
sending activity and returns an errors dictionary ready for the
|
|
actionable_errors widget to display. """
|
|
|
|
companies = self.mapped("company_id")
|
|
companies_partners = companies.mapped("partner_id")
|
|
moves_full = self.filtered(lambda m: not m._l10n_it_edi_is_simplified())
|
|
moves_simplified = self.filtered(lambda m: m._l10n_it_edi_is_simplified())
|
|
|
|
full = moves_full.mapped("commercial_partner_id").filtered(lambda p: p not in companies_partners)
|
|
simplified = moves_simplified.mapped("commercial_partner_id").filtered(lambda p: p not in companies_partners | full)
|
|
representatives = companies.mapped("l10n_it_tax_representative_partner_id").filtered(lambda p: p not in companies_partners | simplified | full)
|
|
|
|
return {
|
|
**companies._l10n_it_edi_export_check(),
|
|
**full._l10n_it_edi_export_check(['partner_address_missing']),
|
|
**simplified._l10n_it_edi_export_check(['partner_country_missing']),
|
|
**(simplified | full)._l10n_it_edi_export_check(['partner_vat_codice_fiscale_missing']),
|
|
**representatives._l10n_it_edi_export_check(['partner_vat_missing']),
|
|
**self._l10n_it_edi_base_export_check(),
|
|
**self._l10n_it_edi_export_taxes_check(),
|
|
}
|
|
|
|
def _l10n_it_edi_base_export_check(self):
|
|
def build_error(message, records):
|
|
return {
|
|
'message': message,
|
|
**({
|
|
'action_text': _("View invoice(s)"),
|
|
'action': records._get_records_action(name=_("Invoice(s) to check")),
|
|
} if len(self) > 1 else {})
|
|
}
|
|
|
|
errors = {}
|
|
if moves := self.filtered(lambda move: move.l10n_it_edi_is_self_invoice and move._l10n_it_edi_services_or_goods() == 'both'):
|
|
errors['l10n_it_edi_move_rc_mixed_product_types'] = build_error(
|
|
message=_("Cannot apply Reverse Charge to bills which contains both services and goods."),
|
|
records=moves)
|
|
|
|
if pa_moves := self.filtered(lambda move: move.commercial_partner_id._l10n_it_edi_is_public_administration()):
|
|
if moves := pa_moves.filtered(lambda move: not move.l10n_it_origin_document_type):
|
|
message = _("Partner(s) belongs to the Public Administration, please fill out Origin Document Type field in the Electronic Invoicing tab.")
|
|
errors['move_missing_origin_document'] = build_error(message=message, records=moves)
|
|
if moves := pa_moves.filtered(lambda move: move.l10n_it_origin_document_date and move.l10n_it_origin_document_date > fields.Date.today()):
|
|
message = _("The Origin Document Date cannot be in the future.")
|
|
errors['l10n_it_edi_move_future_origin_document_date'] = build_error(message=message, records=moves)
|
|
if pa_moves := self.filtered(lambda move: len(move.commercial_partner_id.l10n_it_pa_index or '') == 7):
|
|
if moves := pa_moves.filtered(lambda move: not move.l10n_it_origin_document_type and move.l10n_it_cig and move.l10n_it_cup):
|
|
message = _("CIG/CUP fields of partner(s) are present, please fill out Origin Document Type field in the Electronic Invoicing tab.")
|
|
errors['move_missing_origin_document_field'] = build_error(message=message, records=moves)
|
|
return errors
|
|
|
|
def _l10n_it_edi_export_taxes_check(self):
|
|
if move_lines := self.mapped("invoice_line_ids").filtered(lambda line:
|
|
line.display_type == 'product'
|
|
and len(line.tax_ids.flatten_taxes_hierarchy()._l10n_it_filter_kind('vat')) != 1
|
|
):
|
|
return {
|
|
'l10n_it_edi_move_only_one_vat_tax_per_line': {
|
|
'message': _("Invoices must have exactly one VAT tax set per line."),
|
|
**({
|
|
'action_text': _("View invoice(s)"),
|
|
'action': move_lines.mapped("move_id")._get_records_action(name=_("Check taxes on invoice lines")),
|
|
} if len(self) > 1 else {})
|
|
}}
|
|
return {}
|
|
|
|
def _l10n_it_edi_get_formatters(self):
|
|
def format_alphanumeric(text, maxlen=None):
|
|
if not text:
|
|
return False
|
|
text = text.encode('latin-1', 'replace').decode('latin-1')
|
|
if maxlen and maxlen > 0:
|
|
text = text[:maxlen]
|
|
elif maxlen and maxlen < 0:
|
|
text = text[maxlen:]
|
|
return text
|
|
|
|
def format_date(dt):
|
|
# Format the date in the italian standard.
|
|
dt = dt or datetime.now()
|
|
return dt.strftime('%Y-%m-%d')
|
|
|
|
def format_monetary(number, currency):
|
|
# Format the monetary values to avoid trailing decimals (e.g. 90.85000000000001).
|
|
return float_repr(number, min(2, currency.decimal_places))
|
|
|
|
def format_float(amount, precision):
|
|
if amount is None or amount is False:
|
|
return None
|
|
# Avoid things like -0.0, see: https://stackoverflow.com/a/11010869
|
|
return '%.*f' % (precision, amount if not float_is_zero(amount, precision_digits=precision) else 0.0)
|
|
|
|
def format_numbers(number):
|
|
#format number to str with between 2 and 8 decimals (event if it's .00)
|
|
number_splited = str(number).split('.')
|
|
if len(number_splited) == 1:
|
|
return "%.02f" % number
|
|
|
|
cents = number_splited[1]
|
|
if len(cents) > 8:
|
|
return "%.08f" % number
|
|
return float_repr(number, max(2, len(cents)))
|
|
|
|
def format_numbers_two(number):
|
|
#format number to str with 2 (event if it's .00)
|
|
return "%.02f" % number
|
|
|
|
def format_phone(number):
|
|
if not number:
|
|
return False
|
|
number = number.replace(' ', '').replace('/', '').replace('.', '')
|
|
if len(number) > 4 and len(number) < 13:
|
|
return format_alphanumeric(number)
|
|
return False
|
|
|
|
def format_address(street, street2, maxlen=60):
|
|
street, street2 = street or '', street2 or ''
|
|
if street and len(street) >= maxlen:
|
|
street2 = ''
|
|
sep = ' ' if street and street2 else ''
|
|
return format_alphanumeric(f"{street}{sep}{street2}", maxlen)
|
|
|
|
return {
|
|
'format_date': format_date,
|
|
'format_float': format_float,
|
|
'format_monetary': format_monetary,
|
|
'format_numbers': format_numbers,
|
|
'format_numbers_two': format_numbers_two,
|
|
'format_phone': format_phone,
|
|
'format_alphanumeric': format_alphanumeric,
|
|
'format_address': format_address,
|
|
}
|
|
|
|
def _l10n_it_edi_render_xml(self, pdf_values=None):
|
|
''' Create the xml file content.
|
|
:return: The XML content as bytestring.
|
|
'''
|
|
qweb_template_name = (
|
|
'l10n_it_edi.account_invoice_it_FatturaPA_export' if not self._l10n_it_edi_is_simplified()
|
|
else 'l10n_it_edi.account_invoice_it_simplified_FatturaPA_export')
|
|
xml_content = self.env['ir.qweb']._render(qweb_template_name, {
|
|
**self._l10n_it_edi_get_values(pdf_values),
|
|
**self._l10n_it_edi_get_formatters()})
|
|
xml_node = cleanup_xml_node(xml_content, remove_blank_nodes=False)
|
|
return etree.tostring(xml_node, xml_declaration=True, encoding='UTF-8')
|
|
|
|
def _l10n_it_edi_get_attachment_values(self, pdf_values=None):
|
|
self.ensure_one()
|
|
return {
|
|
'name': self._l10n_it_edi_generate_filename(),
|
|
'type': 'binary',
|
|
'mimetype': 'application/xml',
|
|
'description': _('IT EDI e-move: %s', self.move_type),
|
|
'company_id': self.company_id.id,
|
|
'res_id': self.id,
|
|
'res_model': self._name,
|
|
'res_field': 'l10n_it_edi_attachment_file',
|
|
'raw': self._l10n_it_edi_render_xml(pdf_values=pdf_values),
|
|
}
|
|
|
|
def _l10n_it_edi_generate_filename(self):
|
|
'''Returns a name conform to the Fattura pa Specifications:
|
|
See ES documentation 2.2
|
|
'''
|
|
a = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
|
|
# Each company should have its own filename sequence. If it does not exist, create it
|
|
n = self.env['ir.sequence'].with_company(self.company_id).next_by_code('l10n_it_edi.fattura_filename')
|
|
if not n:
|
|
# The offset is used to avoid conflicts with existing filenames
|
|
offset = 62 ** 4
|
|
sequence = self.env['ir.sequence'].sudo().create({
|
|
'name': 'FatturaPA Filename Sequence',
|
|
'code': 'l10n_it_edi.fattura_filename',
|
|
'company_id': self.company_id.id,
|
|
'number_next': offset,
|
|
})
|
|
n = sequence._next()
|
|
# The n is returned as a string, but we require an int
|
|
n = int(''.join(filter(lambda c: c.isdecimal(), n)))
|
|
|
|
progressive_number = ""
|
|
while n:
|
|
(n, m) = divmod(n, len(a))
|
|
progressive_number = a[m] + progressive_number
|
|
|
|
return '%(country_code)s%(codice)s_%(progressive_number)s.xml' % {
|
|
'country_code': self.company_id.country_id.code,
|
|
'codice': self.company_id.partner_id._l10n_it_edi_normalized_codice_fiscale(),
|
|
'progressive_number': progressive_number.zfill(5),
|
|
}
|
|
|
|
def _l10n_it_edi_send(self, attachments_vals):
|
|
files_to_upload = []
|
|
filename_move = {}
|
|
|
|
# Setup moves for sending
|
|
for move in self:
|
|
attachment_vals = attachments_vals[move]
|
|
filename = attachment_vals['name']
|
|
content = b64encode(attachment_vals['raw']).decode()
|
|
move.l10n_it_edi_header = False
|
|
if move.commercial_partner_id._l10n_it_edi_is_public_administration():
|
|
move.l10n_it_edi_state = 'requires_user_signature'
|
|
move.l10n_it_edi_transaction = False
|
|
move.sudo().message_post(body=nl2br(_(
|
|
"Sending invoices to Public Administration partners is not supported.\n"
|
|
"The IT EDI XML file is generated, please sign the document and upload it "
|
|
"through the 'Fatture e Corrispettivi' portal of the Tax Agency."
|
|
)))
|
|
else:
|
|
move.l10n_it_edi_state = 'being_sent'
|
|
files_to_upload.append({'filename': filename, 'xml': content})
|
|
filename_move[filename] = move
|
|
|
|
# Upload files
|
|
try:
|
|
results = self._l10n_it_edi_upload(files_to_upload)
|
|
except AccountEdiProxyError as e:
|
|
messages_to_log = []
|
|
for filename in filename_move:
|
|
unsent_move = filename_move[filename]
|
|
unsent_move.l10n_it_edi_state = False
|
|
text_message = _("Error uploading the e-invoice file %(file)s.\n%(error)s", file=filename, error=e.message)
|
|
html_message = nl2br(text_message)
|
|
unsent_move.l10n_it_edi_header = text_message
|
|
unsent_move.sudo().message_post(body=html_message)
|
|
messages_to_log.append(text_message)
|
|
raise UserError("\n".join(messages_to_log)) from e
|
|
|
|
# Handle results
|
|
for filename, vals in results.items():
|
|
sent_move = filename_move[filename]
|
|
if 'error' in vals:
|
|
sent_move.l10n_it_edi_state = False
|
|
sent_move.l10n_it_edi_transaction = False
|
|
message = nl2br(_("Error uploading the e-invoice file %(file)s.\n%(error)s", file=filename, error=vals['error']))
|
|
else:
|
|
is_demo = vals['id_transaction'] == 'demo'
|
|
sent_move.l10n_it_edi_state = 'processing'
|
|
sent_move.l10n_it_edi_transaction = vals['id_transaction']
|
|
message = (
|
|
_("We are simulating the sending of the e-invoice file %s, as we are in demo mode.", filename)
|
|
if is_demo else _("The e-invoice file %s was sent to the SdI for processing.", filename))
|
|
sent_move.l10n_it_edi_header = message
|
|
sent_move.sudo().message_post(body=message)
|
|
|
|
def _l10n_it_edi_upload(self, files):
|
|
'''Upload files to the SdI.
|
|
|
|
:param files: A list of dictionary {filename, base64_xml}.
|
|
:returns: A dictionary.
|
|
* message: Message from fatturapa.
|
|
* transactionId: The fatturapa ID of this request.
|
|
* error: An eventual error.
|
|
'''
|
|
if not files:
|
|
return {}
|
|
proxy_user = self.company_id.l10n_it_edi_proxy_user_id
|
|
proxy_user.ensure_one()
|
|
if proxy_user.edi_mode == 'demo':
|
|
return {file_data['filename']: {'id_transaction': 'demo'} for file_data in files}
|
|
|
|
ERRORS = {'EI01': _('Attached file is empty'),
|
|
'EI02': _('Service momentarily unavailable'),
|
|
'EI03': _('Unauthorized user')}
|
|
|
|
server_url = proxy_user._get_server_url()
|
|
results = proxy_user._make_request(
|
|
f'{server_url}/api/l10n_it_edi/1/out/SdiRiceviFile',
|
|
params={'files': files})
|
|
|
|
for filename, vals in results.items():
|
|
if 'error' in vals:
|
|
results[filename]['error'] = ERRORS.get(vals.get('error'), _("Unknown error"))
|
|
|
|
return results
|
|
|
|
# -------------------------------------------------------------------------
|
|
# EDI: Update notifications
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _l10n_it_edi_update_send_state(self):
|
|
''' Check if the current invoices have been processed by the SdI. '''
|
|
proxy_user = self.company_id.l10n_it_edi_proxy_user_id
|
|
if proxy_user.edi_mode == 'demo':
|
|
for move in self:
|
|
filename = move.l10n_it_edi_attachment_id and move.l10n_it_edi_attachment_id.name or '???'
|
|
self._l10n_it_edi_write_send_state(
|
|
transformed_notification={
|
|
'l10n_it_edi_state': 'forwarded',
|
|
'l10n_it_edi_transaction': f'demo_{uuid.uuid4()}',
|
|
'send_ack_to_edi_proxy': False,
|
|
'date': fields.Date.today(),
|
|
'filename': filename},
|
|
message=_("The e-invoice file %s has been sent in Demo EDI mode.", filename))
|
|
return
|
|
|
|
server_url = proxy_user._get_server_url()
|
|
try:
|
|
notifications = proxy_user._make_request(
|
|
f'{server_url}/api/l10n_it_edi/1/in/TrasmissioneFatture',
|
|
params={'ids_transaction': self.mapped("l10n_it_edi_transaction")})
|
|
except AccountEdiProxyError as pe:
|
|
raise UserError(_("An error occurred while downloading updates from the Proxy Server: (%(code)s) %(message)s", code=pe.code, message=pe.message)) from pe
|
|
|
|
for _id_transaction, notification in notifications.items():
|
|
encrypted_update_content = notification.get('file')
|
|
encryption_key = notification.get('key')
|
|
if (encrypted_update_content and encryption_key):
|
|
notification['xml_content'] = proxy_user._decrypt_data(encrypted_update_content, encryption_key)
|
|
|
|
acks = {'transaction_ids': [], 'states': []}
|
|
for move in self:
|
|
notification = notifications[move.l10n_it_edi_transaction]
|
|
parsed_notification = move._l10n_it_edi_parse_notification(notification)
|
|
transformed_notification = move._l10n_it_edi_transform_notification(parsed_notification)
|
|
message = move._l10n_it_edi_get_message(transformed_notification)
|
|
move._l10n_it_edi_write_send_state(transformed_notification, message)
|
|
if (
|
|
transformed_notification.get('send_ack_to_edi_proxy')
|
|
and (id_transaction_to_ack := transformed_notification.get('l10n_it_edi_transaction'))
|
|
and (ack_state := transformed_notification.get('l10n_it_edi_state'))
|
|
):
|
|
acks['transaction_ids'].append(id_transaction_to_ack)
|
|
acks['states'].append(ack_state)
|
|
|
|
if acks:
|
|
transaction_ids = acks['transaction_ids']
|
|
states = acks['states']
|
|
try:
|
|
proxy_user._make_request(
|
|
f'{server_url}/api/l10n_it_edi/1/ack',
|
|
params={'transaction_ids': transaction_ids, 'states': states})
|
|
except AccountEdiProxyError as pe:
|
|
raise UserError(_("An error occurred while downloading updates from the Proxy Server: (%(code)s) %(message)s", code=pe.code, message=pe.message)) from pe
|
|
|
|
def _l10n_it_edi_parse_notification(self, notification):
|
|
sdi_state = notification.get('state', '')
|
|
if not (xml_content := notification.get('xml_content')):
|
|
return {'sdi_state': sdi_state}
|
|
|
|
decrypted_update_content = etree.fromstring(xml_content)
|
|
outcome = get_text(decrypted_update_content, './/Esito')
|
|
date_arrival = get_datetime(decrypted_update_content, './/DataOraRicezione') or fields.Date.today()
|
|
errors = [(
|
|
get_text(error_element, '//Codice'),
|
|
get_text(error_element, '//Descrizione'),
|
|
) for error_element in decrypted_update_content.xpath('//Errore')]
|
|
filename = get_text(decrypted_update_content, './/NomeFile')
|
|
|
|
return {
|
|
'sdi_state': sdi_state,
|
|
'errors': errors,
|
|
'outcome': outcome,
|
|
'date': date_arrival,
|
|
'filename': filename,
|
|
}
|
|
|
|
def _l10n_it_edi_transform_notification(self, parsed_notification):
|
|
""" Reads the notification XML coming from the EDI Proxy Server
|
|
Recovers information about the new state.
|
|
Computes whether the EDI Proxy Server is to be acked,
|
|
and whether the id_transaction has to be reset.
|
|
"""
|
|
self.ensure_one()
|
|
state_map = {
|
|
'not_found': False,
|
|
'awaiting_outcome': 'processing',
|
|
'notificaScarto': 'rejected',
|
|
'ricevutaConsegna': 'forwarded',
|
|
'forward_attempt': 'forward_attempt',
|
|
'notificaMancataConsegna': 'forward_failed',
|
|
('notificaEsito', 'EC01'): 'accepted_by_pa_partner',
|
|
('notificaEsito', 'EC02'): 'rejected_by_pa_partner',
|
|
'notificaDecorrenzaTermini': 'accepted_by_pa_partner_after_expiry',
|
|
}
|
|
sdi_state = parsed_notification['sdi_state']
|
|
filename = parsed_notification.get('filename')
|
|
errors = parsed_notification.get('errors', [])
|
|
date = parsed_notification.get('date', fields.Date.today())
|
|
if not filename and self.l10n_it_edi_attachment_id:
|
|
filename = self.l10n_it_edi_attachment_id.name
|
|
outcome = parsed_notification.get('outcome', False)
|
|
if not outcome:
|
|
new_state = state_map.get(sdi_state, False)
|
|
else:
|
|
new_state = state_map.get((sdi_state, outcome), False)
|
|
|
|
parsed_notification.update({
|
|
'l10n_it_edi_state': new_state,
|
|
'l10n_it_edi_transaction': False if new_state in (False, 'rejected') else self.l10n_it_edi_transaction,
|
|
'send_ack_to_edi_proxy': bool(new_state),
|
|
'date': date,
|
|
'errors': errors,
|
|
'filename': filename,
|
|
})
|
|
return parsed_notification
|
|
|
|
def _l10n_it_edi_write_send_state(self, transformed_notification, message):
|
|
""" Update the record with the data coming from the IAP server.
|
|
Eventually post the message.
|
|
Commit the transaction.
|
|
"""
|
|
self.ensure_one()
|
|
old_state = self.l10n_it_edi_state
|
|
new_state = transformed_notification['l10n_it_edi_state']
|
|
self.write({
|
|
'l10n_it_edi_state': new_state,
|
|
'l10n_it_edi_transaction': transformed_notification['l10n_it_edi_transaction'],
|
|
'l10n_it_edi_header': message or False,
|
|
})
|
|
|
|
if message and old_state != new_state:
|
|
self.with_context(no_new_invoice=True).sudo().message_post(body=message)
|
|
|
|
if new_state == 'rejected':
|
|
self.l10n_it_edi_attachment_file = False
|
|
|
|
self.env.cr.commit()
|
|
|
|
def _l10n_it_edi_get_message(self, transformed_notification):
|
|
""" The status change will be notified in the chatter of the move.
|
|
Compute the message from the notification information coming from the EDI Proxy Server
|
|
"""
|
|
self.ensure_one()
|
|
partner = self.commercial_partner_id
|
|
partner_name = partner.display_name
|
|
filename = transformed_notification['filename']
|
|
new_state = transformed_notification['l10n_it_edi_state']
|
|
if new_state == 'rejected':
|
|
DUPLICATE_MOVE = '00404'
|
|
DUPLICATE_FILENAME = '00002'
|
|
error_descriptions = []
|
|
for error_code, error_description in transformed_notification['errors']:
|
|
error_description_copy = error_description
|
|
if error_code == DUPLICATE_MOVE:
|
|
error_description_copy = _(
|
|
"The e-invoice file %(file)s is duplicated.\n"
|
|
"Original message from the SdI: %(message)s",
|
|
file=filename, message=error_description_copy)
|
|
elif error_code == DUPLICATE_FILENAME:
|
|
error_description_copy = _(
|
|
"The e-invoice filename %(file)s is duplicated. Please check the FatturaPA Filename sequence.\n"
|
|
"Original message from the SdI: %(message)s",
|
|
file=filename, message=error_description_copy)
|
|
error_descriptions.append(error_description_copy)
|
|
|
|
return self._l10n_it_edi_format_errors(_('The e-invoice has been refused by the SdI.'), error_descriptions)
|
|
|
|
elif partner._l10n_it_edi_is_public_administration():
|
|
pa_specific_map = {
|
|
'forwarded': nl2br(_(
|
|
"The e-invoice file %(file)s was succesfully sent to the SdI.\n"
|
|
"%(partner)s has 15 days to accept or reject it.",
|
|
file=filename, partner=partner_name)),
|
|
'forward_attempt': nl2br(_(
|
|
"The e-invoice file %(file)s can't be forward to %(partner)s (Public Administration) by the SdI at the moment.\n"
|
|
"It will try again for 10 days, after which it will be considered accepted, but "
|
|
"you will still have to send it by post or e-mail.",
|
|
file=filename, partner=partner_name)),
|
|
'accepted_by_pa_partner_after_expiry': nl2br(_(
|
|
"The e-invoice file %(file)s is succesfully sent to the SdI. The invoice is now considered fiscally relevant.\n"
|
|
"The %(partner)s (Public Administration) had 15 days to either accept or refused this document,"
|
|
"but since they did not reply, it's now considered accepted.",
|
|
file=filename, partner=partner_name)),
|
|
'rejected_by_pa_partner': nl2br(_(
|
|
"The e-invoice file %(file)s has been refused by %(partner)s (Public Administration).\n"
|
|
"You have 5 days from now to issue a full refund for this invoice, "
|
|
"then contact the PA partner to create a new one according to their "
|
|
"requests and submit it.",
|
|
file=filename, partner=partner_name)),
|
|
'accepted_by_pa_partner': _(
|
|
"The e-invoice file %(file)s has been accepted by %(partner)s (Public Administration), a payment will be issued soon",
|
|
file=filename, partner=partner_name),
|
|
}
|
|
if pa_specific_message := pa_specific_map.get(new_state):
|
|
return pa_specific_message
|
|
|
|
new_state_messages_map = {
|
|
False: _(
|
|
"The e-invoice file %s has not been found on the EDI Proxy server.", filename),
|
|
'processing': nl2br(_(
|
|
"The e-invoice file %s was sent to the SdI for validation.\n"
|
|
"It is not yet considered accepted, please wait further notifications.",
|
|
filename)),
|
|
'forwarded': _(
|
|
"The e-invoice file %(file)s was accepted and succesfully forwarded it to %(partner)s by the SdI.",
|
|
file=filename, partner=partner_name),
|
|
'forward_attempt': nl2br(_(
|
|
"The e-invoice file %(file)s has been accepted by the SdI.\n"
|
|
"The SdI is trying to forward it to %(partner)s.\n"
|
|
"It will try for up to 2 days, after which you'll eventually "
|
|
"need to send it the invoice to the partner by post or e-mail.",
|
|
file=filename, partner=partner_name)),
|
|
'forward_failed': nl2br(_(
|
|
"The e-invoice file %(file)s couldn't be forwarded to %(partner)s.\n"
|
|
"Please remember to send it via post or e-mail.",
|
|
file=filename, partner=partner_name))
|
|
}
|
|
return new_state_messages_map.get(new_state)
|