234 lines
10 KiB
Python
234 lines
10 KiB
Python
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
import contextlib
|
|
import logging
|
|
import requests
|
|
from lxml import etree
|
|
from markupsafe import Markup
|
|
from hashlib import md5
|
|
from urllib import parse
|
|
|
|
from odoo import api, fields, models
|
|
from odoo.addons.account_peppol.tools.demo_utils import handle_demo
|
|
from odoo.addons.account.models.company import PEPPOL_LIST
|
|
|
|
TIMEOUT = 10
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
class ResPartner(models.Model):
|
|
_inherit = 'res.partner'
|
|
|
|
invoice_sending_method = fields.Selection(
|
|
selection_add=[('peppol', 'by Peppol')],
|
|
)
|
|
available_peppol_sending_methods = fields.Json(compute='_compute_available_peppol_sending_methods')
|
|
available_peppol_edi_formats = fields.Json(compute='_compute_available_peppol_edi_formats')
|
|
peppol_verification_state = fields.Selection(
|
|
selection=[
|
|
('not_verified', 'Not verified yet'),
|
|
('not_valid', 'Not valid'), # does not exist on Peppol at all
|
|
('not_valid_format', 'Cannot receive this format'), # registered on Peppol but cannot receive the selected document type
|
|
('valid', 'Valid'),
|
|
],
|
|
string='Peppol endpoint verification',
|
|
company_dependent=True,
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# COMPUTE METHODS
|
|
# -------------------------------------------------------------------------
|
|
|
|
@api.depends_context('company')
|
|
@api.depends('company_id')
|
|
def _compute_available_peppol_sending_methods(self):
|
|
methods = dict(self._fields['invoice_sending_method'].selection)
|
|
if self.env.company.country_code not in PEPPOL_LIST:
|
|
methods.pop('peppol')
|
|
self.available_peppol_sending_methods = list(methods)
|
|
|
|
@api.depends_context('company')
|
|
@api.depends('invoice_sending_method')
|
|
def _compute_available_peppol_edi_formats(self):
|
|
for partner in self:
|
|
if partner.invoice_sending_method == 'peppol':
|
|
partner.available_peppol_edi_formats = self._get_peppol_formats()
|
|
else:
|
|
partner.available_peppol_edi_formats = list(dict(self._fields['invoice_edi_format'].selection))
|
|
|
|
# -------------------------------------------------------------------------
|
|
# HELPERS
|
|
# -------------------------------------------------------------------------
|
|
|
|
def _log_verification_state_update(self, company, old_value, new_value):
|
|
# log the update of the peppol verification state
|
|
# we do this instead of regular tracking because of the customized message
|
|
# and because we want to log the change for every company in the db
|
|
if old_value == new_value:
|
|
return
|
|
|
|
peppol_verification_state_field = self._fields['peppol_verification_state']
|
|
selection_values = dict(peppol_verification_state_field.selection)
|
|
old_label = selection_values[old_value] if old_value else False # get translated labels
|
|
new_label = selection_values[new_value] if new_value else False
|
|
|
|
body = Markup("""
|
|
<ul>
|
|
<li>
|
|
<span class='o-mail-Message-trackingOld me-1 px-1 text-muted fw-bold'>{old}</span>
|
|
<i class='o-mail-Message-trackingSeparator fa fa-long-arrow-right mx-1 text-600'/>
|
|
<span class='o-mail-Message-trackingNew me-1 fw-bold text-info'>{new}</span>
|
|
<span class='o-mail-Message-trackingField ms-1 fst-italic text-muted'>({field})</span>
|
|
<span class='o-mail-Message-trackingCompany ms-1 fst-italic text-muted'>({company})</span>
|
|
</li>
|
|
</ul>
|
|
""").format(
|
|
old=old_label,
|
|
new=new_label,
|
|
field=peppol_verification_state_field.string,
|
|
company=company.display_name,
|
|
)
|
|
self._message_log(body=body)
|
|
|
|
@api.model
|
|
def _get_participant_info(self, edi_identification):
|
|
hash_participant = md5(edi_identification.lower().encode()).hexdigest()
|
|
endpoint_participant = parse.quote_plus(f"iso6523-actorid-upis::{edi_identification}")
|
|
edi_mode = self.env.company._get_peppol_edi_mode()
|
|
sml_zone = 'acc.edelivery' if edi_mode == 'test' else 'edelivery'
|
|
smp_url = f"http://B-{hash_participant}.iso6523-actorid-upis.{sml_zone}.tech.ec.europa.eu/{endpoint_participant}"
|
|
|
|
try:
|
|
response = requests.get(smp_url, timeout=TIMEOUT)
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as e:
|
|
_logger.debug(e)
|
|
return None
|
|
return etree.fromstring(response.content)
|
|
|
|
@api.model
|
|
def _check_peppol_participant_exists(self, participant_info, edi_identification, check_company=False):
|
|
participant_identifier = participant_info.findtext('{*}ParticipantIdentifier')
|
|
service_metadata = participant_info.find('.//{*}ServiceMetadataReference')
|
|
service_href = ''
|
|
if service_metadata is not None:
|
|
service_href = service_metadata.attrib.get('href', '')
|
|
|
|
if edi_identification != participant_identifier or 'hermes-belgium' in service_href:
|
|
# all Belgian companies are pre-registered on hermes-belgium, so they will
|
|
# technically have an existing SMP url but they are not real Peppol participants
|
|
return False
|
|
|
|
if check_company:
|
|
# if we are only checking company's existence on the network, we don't care about what documents they can receive
|
|
if not service_href:
|
|
return True
|
|
|
|
access_point_contact = True
|
|
with contextlib.suppress(requests.exceptions.RequestException, etree.XMLSyntaxError):
|
|
response = requests.get(service_href, timeout=TIMEOUT)
|
|
if response.status_code == 200:
|
|
access_point_info = etree.fromstring(response.content)
|
|
access_point_contact = access_point_info.findtext('.//{*}TechnicalContactUrl') or access_point_info.findtext('.//{*}TechnicalInformationUrl')
|
|
return access_point_contact
|
|
|
|
return True
|
|
|
|
def _check_document_type_support(self, participant_info, ubl_cii_format):
|
|
service_references = participant_info.findall(
|
|
'{*}ServiceMetadataReferenceCollection/{*}ServiceMetadataReference'
|
|
)
|
|
document_type = self.env['account.edi.xml.ubl_21']._get_customization_ids()[ubl_cii_format]
|
|
for service in service_references:
|
|
if document_type in parse.unquote_plus(service.attrib.get('href', '')):
|
|
return True
|
|
return False
|
|
|
|
def _update_peppol_state_per_company(self, vals=None):
|
|
partners = self.env['res.partner']
|
|
if vals is None:
|
|
partners = self.filtered(lambda p: all([p.peppol_eas, p.peppol_endpoint, p.is_ubl_format, p.country_code in PEPPOL_LIST]))
|
|
elif {'peppol_eas', 'peppol_endpoint', 'invoice_edi_format'}.intersection(vals.keys()):
|
|
partners = self.filtered(lambda p: p.country_code in PEPPOL_LIST)
|
|
|
|
all_companies = None
|
|
for partner in partners.sudo():
|
|
if partner.company_id:
|
|
partner.button_account_peppol_check_partner_endpoint(company=partner.company_id)
|
|
continue
|
|
|
|
if all_companies is None:
|
|
all_companies = self.env['res.company'].sudo().search([])
|
|
|
|
for company in all_companies:
|
|
partner.button_account_peppol_check_partner_endpoint(company=company)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# LOW-LEVEL METHODS
|
|
# -------------------------------------------------------------------------
|
|
|
|
def write(self, vals):
|
|
res = super().write(vals)
|
|
self._update_peppol_state_per_company(vals=vals)
|
|
return res
|
|
|
|
@api.model_create_multi
|
|
def create(self, vals_list):
|
|
res = super().create(vals_list)
|
|
if res:
|
|
res._update_peppol_state_per_company()
|
|
return res
|
|
|
|
# -------------------------------------------------------------------------
|
|
# BUSINESS ACTIONS
|
|
# -------------------------------------------------------------------------
|
|
|
|
@handle_demo
|
|
def button_account_peppol_check_partner_endpoint(self, company=None):
|
|
""" A basic check for whether a participant is reachable at the given
|
|
Peppol participant ID - peppol_eas:peppol_endpoint (ex: '9999:test')
|
|
The SML (Service Metadata Locator) assigns a DNS name to each peppol participant.
|
|
This DNS name resolves into the SMP (Service Metadata Publisher) of the participant.
|
|
The DNS address is of the following form:
|
|
- "http://B-" + hexstring(md5(lowercase(ID-VALUE))) + "." + ID-SCHEME + "." + SML-ZONE-NAME + "/" + url_encoded(ID-SCHEME + "::" + ID-VALUE)
|
|
(ref:https://peppol.helger.com/public/locale-en_US/menuitem-docs-doc-exchange)
|
|
"""
|
|
self.ensure_one()
|
|
if not company:
|
|
company = self.env.company
|
|
|
|
self_partner = self.with_company(company)
|
|
old_value = self_partner.peppol_verification_state
|
|
self_partner.peppol_verification_state = self._get_peppol_verification_state(
|
|
self.peppol_endpoint,
|
|
self.peppol_eas,
|
|
self_partner.invoice_edi_format
|
|
)
|
|
if self_partner.peppol_verification_state == 'valid':
|
|
self_partner.invoice_sending_method = 'peppol'
|
|
|
|
self._log_verification_state_update(company, old_value, self_partner.peppol_verification_state)
|
|
return False
|
|
|
|
@api.model
|
|
@handle_demo
|
|
def _get_peppol_verification_state(self, peppol_endpoint, peppol_eas, invoice_edi_format):
|
|
if not (peppol_eas and peppol_endpoint) or invoice_edi_format not in self._get_peppol_formats():
|
|
return False
|
|
|
|
edi_identification = f"{peppol_eas}:{peppol_endpoint}".lower()
|
|
participant_info = self._get_participant_info(edi_identification)
|
|
if participant_info is None:
|
|
return 'not_valid'
|
|
else:
|
|
is_participant_on_network = self._check_peppol_participant_exists(participant_info, edi_identification)
|
|
if is_participant_on_network:
|
|
is_valid_format = self._check_document_type_support(participant_info, invoice_edi_format)
|
|
if is_valid_format:
|
|
return 'valid'
|
|
else:
|
|
return 'not_valid_format'
|
|
else:
|
|
return 'not_valid'
|