Odoo18-Base/addons/l10n_hu_edi/models/l10n_hu_edi_connection.py
2025-01-06 10:57:38 +07:00

434 lines
20 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
from base64 import b64decode, b64encode
import binascii
from datetime import datetime, timedelta, timezone
import hashlib
import logging
import secrets
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import dateutil.parser
from lxml import etree
import requests
from odoo import _, release
from odoo.tools import cleanup_xml_node
_logger = logging.getLogger(__name__)
XML_NAMESPACES = {
'api': 'http://schemas.nav.gov.hu/OSA/3.0/api',
'common': 'http://schemas.nav.gov.hu/NTCA/1.0/common',
'base': 'http://schemas.nav.gov.hu/OSA/3.0/base',
'data': 'http://schemas.nav.gov.hu/OSA/3.0/data',
}
def format_bool(value):
return 'true' if value else 'false'
def format_timestamp(value):
return value.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
def decrypt_aes128(key, encrypted_token):
""" Decrypt AES-128-ECB encrypted bytes.
:param key bytes: the 128-bit key
:param encrypted_token bytes: the bytes to decrypt
:return: the decrypted bytes
"""
decryptor = Cipher(algorithms.AES(key), modes.ECB()).decryptor()
decrypted_token = decryptor.update(encrypted_token) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
unpadded_token = unpadder.update(decrypted_token) + unpadder.finalize()
return unpadded_token
class L10nHuEdiConnectionError(Exception):
def __init__(self, errors, code=None):
if not isinstance(errors, list):
errors = [errors]
self.errors = errors
self.code = code
super().__init__('\n'.join(errors))
class L10nHuEdiConnection:
def __init__(self, env):
""" Methods to call NAV API endpoints.
Use this as a context manager (`with L10nHuEdiConnection(...) as connection`)
to ensure the TCP connection is closed when you are finished calling endpoints.
:param env: the Odoo environment
"""
self.env = env
self.session = requests.Session()
def __enter__(self):
return self
def __exit__(self, *args):
self.session.close()
def do_token_exchange(self, credentials):
""" Request a token for invoice submission.
:param credentials: a dictionary
{
'vat': str,
'mode': 'production' || 'test',
'username': str,
'password': str,
'signature_key': str,
'replacement_key': str
}
:return: a dictionary {'token': str, 'token_validity_to': datetime}
:raise: L10nHuEdiConnectionError
"""
if credentials['mode'] == 'demo':
return {'token': 'token', 'token_validity_to': datetime.utcnow() + timedelta(minutes=5)}
template_values = self._get_header_values(credentials)
request_data = self.env['ir.qweb']._render('l10n_hu_edi.token_exchange_request', template_values)
request_data = etree.tostring(cleanup_xml_node(request_data, remove_blank_nodes=False), xml_declaration=True, encoding='UTF-8')
response_xml = self._call_nav_endpoint(credentials['mode'], 'tokenExchange', request_data)
self._parse_error_response(response_xml)
encrypted_token = response_xml.findtext('api:encodedExchangeToken', namespaces=XML_NAMESPACES)
token_validity_to = response_xml.findtext('api:tokenValidityTo', namespaces=XML_NAMESPACES)
try:
# Convert into a naive UTC datetime, since Odoo can't store timezone-aware datetimes
token_validity_to = dateutil.parser.isoparse(token_validity_to).astimezone(timezone.utc).replace(tzinfo=None)
except ValueError:
_logger.warning('Could not parse token validity end timestamp!')
token_validity_to = datetime.utcnow() + timedelta(minutes=5)
if not encrypted_token:
raise L10nHuEdiConnectionError(_('Missing token in response from NAV.'))
try:
token = decrypt_aes128(credentials['replacement_key'].encode(), b64decode(encrypted_token.encode())).decode()
except ValueError as e:
raise L10nHuEdiConnectionError(_('Error during decryption of ExchangeToken.')) from e
return {'token': token, 'token_validity_to': token_validity_to}
def do_manage_invoice(self, credentials, token, invoice_operations):
""" Submit one or more invoices.
:param token: a token obtained via `do_token_exchange`
:param invoice_operations: a list of dictionaries:
{
'index': <index given to invoice>,
'operation': 'CREATE' or 'MODIFY',
'invoice_data': <XML data of the invoice as bytes>
}
:return str: The transaction code issued by NAV.
:raise: L10nHuEdiConnectionError, with code='timeout' if a timeout occurred.
"""
if credentials['mode'] == 'demo':
return secrets.token_hex(8).upper()
template_values = {
'exchangeToken': token,
'compressedContent': False,
'invoices': [],
}
invoice_hashes = []
for invoice_operation in invoice_operations:
invoice_data_b64 = b64encode(invoice_operation['invoice_data']).decode('utf-8')
template_values['invoices'].append({
'index': invoice_operation['index'],
'invoiceOperation': invoice_operation['operation'],
'invoiceData': invoice_data_b64,
})
invoice_hashes.append(self._calculate_invoice_hash(invoice_operation['operation'] + invoice_data_b64))
template_values.update(self._get_header_values(credentials, invoice_hashs=invoice_hashes))
request_data = self.env['ir.qweb']._render('l10n_hu_edi.manage_invoice_request', template_values)
request_data = etree.tostring(cleanup_xml_node(request_data, remove_blank_nodes=False), xml_declaration=True, encoding='UTF-8')
response_xml = self._call_nav_endpoint(credentials['mode'], 'manageInvoice', request_data, timeout=60)
self._parse_error_response(response_xml)
transaction_code = response_xml.findtext('api:transactionId', namespaces=XML_NAMESPACES)
if not transaction_code:
raise L10nHuEdiConnectionError(_('Invoice Upload failed: NAV did not return a Transaction ID.'))
return transaction_code
def do_query_transaction_status(self, credentials, transaction_code, return_original_request=False):
""" Query the status of a transaction.
:param transaction_code: the code of the transaction to query
:param return_original_request: whether to request the submitted invoice XML.
:return: a list of dicts {'index': str, 'invoice_status': str, 'business_validation_messages', 'technical_validation_messages'}
:raise: L10nHuEdiConnectionError
"""
if credentials['mode'] == 'demo':
invoices = self.env['account.move'].search([('l10n_hu_edi_transaction_code', '=', transaction_code)])
if any(m.l10n_hu_edi_state.startswith('cancel') for m in invoices):
return {
'processing_results': [
{
'index': str(i + 1),
'invoice_status': 'DONE',
'business_validation_messages': [{
'validation_result_code': 'INFO',
'validation_error_code': 'INFO_SINGLE_INVOICE_ANNULMENT',
'message': 'Egyedi számla technikai érvénytelenítése',
}],
'technical_validation_messages': [],
}
for i in range(len(invoices))
],
'annulment_status': 'VERIFICATION_DONE',
}
else:
return {
'processing_results': [
{
'index': str(i + 1),
'invoice_status': 'DONE',
'business_validation_messages': [],
'technical_validation_messages': [],
}
for i in range(len(invoices))
],
'annulment_status': None,
}
template_values = {
**self._get_header_values(credentials),
'transactionId': transaction_code,
'returnOriginalRequest': return_original_request,
}
request_data = self.env['ir.qweb']._render('l10n_hu_edi.query_transaction_status_request', template_values)
request_data = etree.tostring(cleanup_xml_node(request_data, remove_blank_nodes=False), xml_declaration=True, encoding='UTF-8')
response_xml = self._call_nav_endpoint(credentials['mode'], 'queryTransactionStatus', request_data)
self._parse_error_response(response_xml)
results = {
'processing_results': [],
'annulment_status': response_xml.findtext('api:processingResults/api:annulmentData/api:annulmentVerificationStatus', namespaces=XML_NAMESPACES),
}
for processing_result_xml in response_xml.iterfind('api:processingResults/api:processingResult', namespaces=XML_NAMESPACES):
processing_result = {
'index': processing_result_xml.findtext('api:index', namespaces=XML_NAMESPACES),
'invoice_status': processing_result_xml.findtext('api:invoiceStatus', namespaces=XML_NAMESPACES),
'business_validation_messages': [],
'technical_validation_messages': [],
}
for message_xml in processing_result_xml.iterfind('api:businessValidationMessages', namespaces=XML_NAMESPACES):
processing_result['business_validation_messages'].append({
'validation_result_code': message_xml.findtext('api:validationResultCode', namespaces=XML_NAMESPACES),
'validation_error_code': message_xml.findtext('api:validationErrorCode', namespaces=XML_NAMESPACES),
'message': message_xml.findtext('api:message', namespaces=XML_NAMESPACES),
})
for message_xml in processing_result_xml.iterfind('api:technicalValidationMessages', namespaces=XML_NAMESPACES):
processing_result['technical_validation_messages'].append({
'validation_result_code': message_xml.findtext('common:validationResultCode', namespaces=XML_NAMESPACES),
'validation_error_code': message_xml.findtext('common:validationErrorCode', namespaces=XML_NAMESPACES),
'message': message_xml.findtext('common:message', namespaces=XML_NAMESPACES),
})
if return_original_request:
try:
original_file = b64decode(processing_result_xml.findtext('api:originalRequest', namespaces=XML_NAMESPACES))
original_xml = etree.fromstring(original_file)
except binascii.Error as e:
raise L10nHuEdiConnectionError(str(e)) from e
except etree.ParserError as e:
raise L10nHuEdiConnectionError(str(e)) from e
processing_result.update({
'original_file': original_file.decode(),
'original_xml': original_xml,
})
results['processing_results'].append(processing_result)
return results
def do_query_transaction_list(self, credentials, datetime_from, datetime_to, page=1):
""" Query the transactions that were submitted in a given time interval.
:param datetime_from: start of the time interval to query
:param datetime_to: end of the time interval to query
:return: a dict {'transaction_codes': list[str], 'available_pages': int}
:raise: L10nHuEdiConnectionError
"""
if credentials['mode'] == 'demo':
return {"transactions": [], "available_pages": 1}
template_values = {
**self._get_header_values(credentials),
'page': page,
'dateTimeFrom': format_timestamp(datetime_from),
'dateTimeTo': format_timestamp(datetime_to),
}
request_data = self.env['ir.qweb']._render('l10n_hu_edi.query_transaction_list_request', template_values)
request_data = etree.tostring(cleanup_xml_node(request_data, remove_blank_nodes=False), xml_declaration=True, encoding='UTF-8')
response_xml = self._call_nav_endpoint(credentials['mode'], 'queryTransactionList', request_data)
self._parse_error_response(response_xml)
available_pages = response_xml.findtext('api:transactionListResult/api:availablePage', namespaces=XML_NAMESPACES)
try:
available_pages = int(available_pages)
except ValueError:
available_pages = 1
transactions = []
for transaction_xml in response_xml.iterfind('api:transactionListResult/api:transaction', namespaces=XML_NAMESPACES):
try:
send_time = datetime.fromisoformat(transaction_xml.findtext('api:insDate', namespaces=XML_NAMESPACES).replace('Z', ''))
except ValueError as e:
raise L10nHuEdiConnectionError(_('Could not parse time of previous transaction')) from e
transactions.append({
'transaction_code': transaction_xml.findtext('api:transactionId', namespaces=XML_NAMESPACES),
'annulment': transaction_xml.findtext('api:technicalAnnulment', namespaces=XML_NAMESPACES) == 'true',
'username': transaction_xml.findtext('api:insCusUser', namespaces=XML_NAMESPACES),
'source': transaction_xml.findtext('api:source', namespaces=XML_NAMESPACES),
'send_time': send_time,
})
return {"transactions": transactions, "available_pages": available_pages}
def do_manage_annulment(self, credentials, token, annulment_operations):
""" Request technical annulment of one or more invoices.
:param token: a token obtained via `do_token_exchange`
:param annulment_operations: a list of dictionaries:
{
'index': <index given to invoice>,
'annulmentReference': the name of the invoice to annul,
'annulmentCode': one of ('ERRATIC_DATA', 'ERRATIC_INVOICE_NUMBER', 'ERRATIC_INVOICE_ISSUE_DATE', 'ERRATIC_ELECTRONIC_HASH_VALUE'),
'annulmentReason': a plain-text explanation of the reason for annulment,
}
:return str: The transaction code issued by NAV.
:raise: L10nHuEdiConnectionError, with code='timeout' if a timeout occurred.
"""
if credentials['mode'] == 'demo':
return secrets.token_hex(8).upper()
template_values = {
'exchangeToken': token,
'annulments': []
}
annulment_hashes = []
for annulment_operation in annulment_operations:
annulment_operation['annulmentTimestamp'] = format_timestamp(datetime.utcnow())
annulment_data = self.env['ir.qweb']._render('l10n_hu_edi.invoice_annulment', annulment_operation)
annulment_data_b64 = b64encode(annulment_data.encode()).decode('utf-8')
template_values['annulments'].append({
'index': annulment_operation['index'],
'annulmentOperation': 'ANNUL',
'invoiceAnnulment': annulment_data_b64,
})
annulment_hashes.append(self._calculate_invoice_hash('ANNUL' + annulment_data_b64))
template_values.update(self._get_header_values(credentials, invoice_hashs=annulment_hashes))
request_data = self.env['ir.qweb']._render('l10n_hu_edi.manage_annulment_request', template_values)
request_data = etree.tostring(cleanup_xml_node(request_data, remove_blank_nodes=False), xml_declaration=True, encoding='UTF-8')
response_xml = self._call_nav_endpoint(credentials['mode'], 'manageAnnulment', request_data, timeout=60)
self._parse_error_response(response_xml)
transaction_code = response_xml.findtext('api:transactionId', namespaces=XML_NAMESPACES)
if not transaction_code:
raise L10nHuEdiConnectionError(_('Invoice Upload failed: NAV did not return a Transaction ID.'))
return transaction_code
# === Helpers: XML generation === #
def _get_header_values(self, credentials, invoice_hashs=None):
timestamp = datetime.utcnow()
request_id = 'ODOO' + secrets.token_hex(13)
request_signature = self._calculate_request_signature(credentials['signature_key'], request_id, timestamp, invoice_hashs=invoice_hashs)
odoo_version = release.version
module_version = self.env['ir.module.module'].get_module_info('l10n_hu_edi').get('version').replace('saas~', '').replace('.', '')
return {
'requestId': request_id,
'timestamp': format_timestamp(timestamp),
'login': credentials['username'],
'passwordHash': self._calculate_password_hash(credentials['password']),
'taxNumber': credentials['vat'][:8],
'requestSignature': request_signature,
'softwareId': f'BE477472701-{module_version}'[:18],
'softwareName': 'Odoo Enterprise',
'softwareOperation': 'ONLINE_SERVICE',
'softwareMainVersion': odoo_version,
'softwareDevName': 'Odoo SA',
'softwareDevContact': 'andu@odoo.com',
'softwareDevCountryCode': 'BE',
'softwareDevTaxNumber': '477472701',
'format_bool': format_bool,
}
def _calculate_password_hash(self, password):
return hashlib.sha512(password.encode()).hexdigest().upper()
def _calculate_invoice_hash(self, value):
return hashlib.sha3_512(value.encode()).hexdigest().upper()
def _calculate_request_signature(self, key_sign, reqid, reqdate, invoice_hashs=None):
strings = [reqid, reqdate.strftime('%Y%m%d%H%M%S'), key_sign]
# merge the invoice CRCs if we got
if invoice_hashs:
strings += invoice_hashs
# return back the uppered hexdigest
return self._calculate_invoice_hash(''.join(strings))
# === Helpers: HTTP Post === #
def _call_nav_endpoint(self, mode, service, data, timeout=20):
if mode == 'production':
url = 'https://api.onlineszamla.nav.gov.hu/invoiceService/v3/'
elif mode == 'test':
url = 'https://api-test.onlineszamla.nav.gov.hu/invoiceService/v3/'
else:
raise L10nHuEdiConnectionError(_('Mode should be Production or Test!'))
headers = {'content-type': 'application/xml', 'accept': 'application/xml'}
try:
response_object = self.session.post(f'{url}{service}', data=data, headers=headers, timeout=timeout)
except requests.Timeout as e:
raise L10nHuEdiConnectionError(_('Connection to NAV servers timed out.'), code='timeout') from e
except requests.RequestException as e:
raise L10nHuEdiConnectionError(str(e)) from e
try:
response_xml = etree.fromstring(response_object.text.encode())
except etree.ParseError as e:
raise L10nHuEdiConnectionError(_('Invalid NAV response!')) from e
return response_xml
# === Helpers: Response parsing === #
def _parse_error_response(self, response_xml):
error_code = response_xml.findtext('common:result/common:errorCode', namespaces=XML_NAMESPACES)
message = response_xml.findtext('common:result/common:message', namespaces=XML_NAMESPACES)
if error_code:
errors = [f'{error_code}: {message}']
for message_xml in response_xml.iterfind('api:technicalValidationMessages', namespaces=XML_NAMESPACES):
message = message_xml.findtext('api:message', namespaces=XML_NAMESPACES)
error_code = message_xml.findtext('api:validationErrorCode', namespaces=XML_NAMESPACES)
errors.append(f'{error_code}: {message}')
raise L10nHuEdiConnectionError(errors)