Odoo18-Base/addons/l10n_eg_edi_eta/models/eta_thumb_drive.py

176 lines
7.2 KiB
Python
Raw Permalink Normal View History

2025-01-06 10:57:38 +07:00
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import base64
import hashlib
import json
import pytz
from asn1crypto import cms, core, x509, algos, tsp
from odoo import models, fields, _
from odoo.exceptions import ValidationError
class EtaThumbDrive(models.Model):
_name = 'l10n_eg_edi.thumb.drive'
_description = 'Thumb drive used to sign invoices in Egypt'
user_id = fields.Many2one('res.users', required=True, default=lambda self: self.env.user)
company_id = fields.Many2one('res.company', required=True, default=lambda self: self.env.company)
certificate = fields.Binary('ETA Certificate')
pin = fields.Char('ETA USB Pin', required=True)
access_token = fields.Char(required=True)
_sql_constraints = [
('user_drive_uniq', 'unique (user_id, company_id)', 'You can only have one thumb drive per user per company!'),
]
def action_sign_invoices(self, invoice_ids):
self.ensure_one()
sign_host = self._get_host()
to_sign_dict = dict()
for invoice_id in invoice_ids:
eta_invoice = json.loads(invoice_id.l10n_eg_eta_json_doc_id.raw)['request']
signed_attrs = self._generate_signed_attrs(eta_invoice, invoice_id.l10n_eg_signing_time)
to_sign_dict[invoice_id.id] = base64.b64encode(signed_attrs.dump()).decode()
return {
'type': 'ir.actions.client',
'tag': 'action_post_sign_invoice',
'params': {
'sign_host': sign_host,
'access_token': self.access_token,
'pin': self.pin,
'drive_id': self.id,
'invoices': json.dumps(to_sign_dict)
}
}
def action_set_certificate_from_usb(self):
self.ensure_one()
sign_host = self._get_host()
return {
'type': 'ir.actions.client',
'tag': 'action_get_drive_certificate',
'params': {
'sign_host': sign_host,
'access_token': self.access_token,
'pin': self.pin,
'drive_id': self.id
}
}
def set_certificate(self, certificate):
""" This is called from the browser to set the certificate"""
self.ensure_one()
self.certificate = certificate.encode()
return True
def set_signature_data(self, invoices):
""" This is called from the browser with the signed data from the local server """
invoices = json.loads(invoices)
for key, value in invoices.items():
invoice_id = self.env['account.move'].browse(int(key))
eta_invoice_json = json.loads(invoice_id.l10n_eg_eta_json_doc_id.raw)
signature = self._generate_cades_bes_signature(eta_invoice_json['request'], invoice_id.l10n_eg_signing_time,
base64.b64decode(value))
eta_invoice_json['request']['signatures'] = [{'signatureType': 'I', 'value': signature}]
invoice_id.l10n_eg_eta_json_doc_id.raw = json.dumps(eta_invoice_json)
invoice_id.l10n_eg_is_signed = True
return True
def _get_host(self):
# It should be on the loopback address or with a fully valid https host
# in order to be an exception to the mixed-content restrictions
sign_host = self.env['ir.config_parameter'].sudo().get_param('l10n_eg_eta.sign.host', 'http://localhost:8069')
if not sign_host:
raise ValidationError(_('Please define the host of sign tool.'))
return sign_host
def _serialize_for_signing(self, eta_inv):
if not isinstance(eta_inv, dict):
return json.dumps(str(eta_inv), ensure_ascii=False)
canonical_str = []
for key, value in eta_inv.items():
if not isinstance(value, list):
canonical_str.append(json.dumps(key, ensure_ascii=False).upper())
canonical_str.append(self._serialize_for_signing(value))
else:
canonical_str.append(json.dumps(key, ensure_ascii=False).upper())
for elem in value:
canonical_str.append(json.dumps(key, ensure_ascii=False).upper())
canonical_str.append(self._serialize_for_signing(elem))
return ''.join(canonical_str)
def _generate_signed_attrs(self, eta_invoice, signing_time):
cert = x509.Certificate.load(base64.b64decode(self.certificate))
data = hashlib.sha256(self._serialize_for_signing(eta_invoice).encode()).digest()
return cms.CMSAttributes([
cms.CMSAttribute({
'type': cms.CMSAttributeType('content_type'),
'values': ('digested_data',),
}),
cms.CMSAttribute({
'type': cms.CMSAttributeType('message_digest'),
'values': (data,),
}),
cms.CMSAttribute({
'type': tsp.CMSAttributeType('signing_certificate_v2'),
'values': ({
'certs': (tsp.ESSCertIDv2({
'hash_algorithm': algos.DigestAlgorithm({'algorithm': 'sha256'}),
'cert_hash': hashlib.sha256(cert.dump()).digest()
}),)
},),
}),
cms.CMSAttribute({
'type': cms.CMSAttributeType('signing_time'),
'values': (
cms.Time({'utc_time': core.UTCTime(signing_time.replace(tzinfo=pytz.UTC))}),)
}),
])
def _generate_signer_info(self, eta_invoice, signing_time, signature=False):
cert = x509.Certificate.load(base64.b64decode(self.certificate))
signer_info = {
'version': 'v1',
'sid': cms.SignerIdentifier({
'issuer_and_serial_number': cms.IssuerAndSerialNumber({
'issuer': cert.issuer,
'serial_number': cert.serial_number,
}),
}),
'digest_algorithm': algos.DigestAlgorithm({'algorithm': 'sha256'}),
'signature_algorithm': algos.SignedDigestAlgorithm({
'algorithm': 'sha256_rsa'
}),
'signed_attrs': self._generate_signed_attrs(eta_invoice, signing_time)
}
if signature:
signer_info['signature'] = signature
return signer_info
def _generate_cades_bes_signature(self, eta_invoice, signing_time, signature):
cert = x509.Certificate.load(base64.b64decode(self.certificate))
signed_data = {
'version': 'v3',
'digest_algorithms': cms.DigestAlgorithms((
algos.DigestAlgorithm({'algorithm': 'sha256'}),
)),
'encap_content_info': {
'content_type': 'digested_data',
},
'certificates': [cert],
'signer_infos': [
self._generate_signer_info(eta_invoice, signing_time, signature),
],
}
content_info = cms.ContentInfo({'content_type': cms.ContentType('signed_data'), 'content': cms.SignedData(signed_data)})
return base64.b64encode(content_info.dump()).decode()