176 lines
7.2 KiB
Python
176 lines
7.2 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
import base64
|
||
|
import hashlib
|
||
|
import json
|
||
|
|
||
|
import pytz
|
||
|
|
||
|
from asn1crypto import cms, core, x509, algos, tsp
|
||
|
|
||
|
from odoo import models, fields, _
|
||
|
from odoo.exceptions import ValidationError
|
||
|
|
||
|
|
||
|
class EtaThumbDrive(models.Model):
|
||
|
_name = 'l10n_eg_edi.thumb.drive'
|
||
|
_description = 'Thumb drive used to sign invoices in Egypt'
|
||
|
|
||
|
user_id = fields.Many2one('res.users', required=True, default=lambda self: self.env.user)
|
||
|
company_id = fields.Many2one('res.company', required=True, default=lambda self: self.env.company)
|
||
|
certificate = fields.Binary('ETA Certificate')
|
||
|
pin = fields.Char('ETA USB Pin', required=True)
|
||
|
access_token = fields.Char(required=True)
|
||
|
|
||
|
_sql_constraints = [
|
||
|
('user_drive_uniq', 'unique (user_id, company_id)', 'You can only have one thumb drive per user per company!'),
|
||
|
]
|
||
|
|
||
|
def action_sign_invoices(self, invoice_ids):
|
||
|
self.ensure_one()
|
||
|
sign_host = self._get_host()
|
||
|
|
||
|
to_sign_dict = dict()
|
||
|
for invoice_id in invoice_ids:
|
||
|
eta_invoice = json.loads(invoice_id.l10n_eg_eta_json_doc_id.raw)['request']
|
||
|
signed_attrs = self._generate_signed_attrs(eta_invoice, invoice_id.l10n_eg_signing_time)
|
||
|
to_sign_dict[invoice_id.id] = base64.b64encode(signed_attrs.dump()).decode()
|
||
|
|
||
|
return {
|
||
|
'type': 'ir.actions.client',
|
||
|
'tag': 'action_post_sign_invoice',
|
||
|
'params': {
|
||
|
'sign_host': sign_host,
|
||
|
'access_token': self.access_token,
|
||
|
'pin': self.pin,
|
||
|
'drive_id': self.id,
|
||
|
'invoices': json.dumps(to_sign_dict)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
def action_set_certificate_from_usb(self):
|
||
|
self.ensure_one()
|
||
|
sign_host = self._get_host()
|
||
|
|
||
|
return {
|
||
|
'type': 'ir.actions.client',
|
||
|
'tag': 'action_get_drive_certificate',
|
||
|
'params': {
|
||
|
'sign_host': sign_host,
|
||
|
'access_token': self.access_token,
|
||
|
'pin': self.pin,
|
||
|
'drive_id': self.id
|
||
|
}
|
||
|
}
|
||
|
|
||
|
def set_certificate(self, certificate):
|
||
|
""" This is called from the browser to set the certificate"""
|
||
|
self.ensure_one()
|
||
|
self.certificate = certificate.encode()
|
||
|
return True
|
||
|
|
||
|
def set_signature_data(self, invoices):
|
||
|
""" This is called from the browser with the signed data from the local server """
|
||
|
invoices = json.loads(invoices)
|
||
|
for key, value in invoices.items():
|
||
|
invoice_id = self.env['account.move'].browse(int(key))
|
||
|
eta_invoice_json = json.loads(invoice_id.l10n_eg_eta_json_doc_id.raw)
|
||
|
|
||
|
signature = self._generate_cades_bes_signature(eta_invoice_json['request'], invoice_id.l10n_eg_signing_time,
|
||
|
base64.b64decode(value))
|
||
|
|
||
|
eta_invoice_json['request']['signatures'] = [{'signatureType': 'I', 'value': signature}]
|
||
|
invoice_id.l10n_eg_eta_json_doc_id.raw = json.dumps(eta_invoice_json)
|
||
|
invoice_id.l10n_eg_is_signed = True
|
||
|
return True
|
||
|
|
||
|
def _get_host(self):
|
||
|
# It should be on the loopback address or with a fully valid https host
|
||
|
# in order to be an exception to the mixed-content restrictions
|
||
|
sign_host = self.env['ir.config_parameter'].sudo().get_param('l10n_eg_eta.sign.host', 'http://localhost:8069')
|
||
|
if not sign_host:
|
||
|
raise ValidationError(_('Please define the host of sign tool.'))
|
||
|
return sign_host
|
||
|
|
||
|
def _serialize_for_signing(self, eta_inv):
|
||
|
if not isinstance(eta_inv, dict):
|
||
|
return json.dumps(str(eta_inv), ensure_ascii=False)
|
||
|
|
||
|
canonical_str = []
|
||
|
for key, value in eta_inv.items():
|
||
|
if not isinstance(value, list):
|
||
|
canonical_str.append(json.dumps(key, ensure_ascii=False).upper())
|
||
|
canonical_str.append(self._serialize_for_signing(value))
|
||
|
else:
|
||
|
canonical_str.append(json.dumps(key, ensure_ascii=False).upper())
|
||
|
for elem in value:
|
||
|
canonical_str.append(json.dumps(key, ensure_ascii=False).upper())
|
||
|
canonical_str.append(self._serialize_for_signing(elem))
|
||
|
return ''.join(canonical_str)
|
||
|
|
||
|
def _generate_signed_attrs(self, eta_invoice, signing_time):
|
||
|
cert = x509.Certificate.load(base64.b64decode(self.certificate))
|
||
|
data = hashlib.sha256(self._serialize_for_signing(eta_invoice).encode()).digest()
|
||
|
return cms.CMSAttributes([
|
||
|
cms.CMSAttribute({
|
||
|
'type': cms.CMSAttributeType('content_type'),
|
||
|
'values': ('digested_data',),
|
||
|
}),
|
||
|
cms.CMSAttribute({
|
||
|
'type': cms.CMSAttributeType('message_digest'),
|
||
|
'values': (data,),
|
||
|
}),
|
||
|
cms.CMSAttribute({
|
||
|
'type': tsp.CMSAttributeType('signing_certificate_v2'),
|
||
|
'values': ({
|
||
|
'certs': (tsp.ESSCertIDv2({
|
||
|
'hash_algorithm': algos.DigestAlgorithm({'algorithm': 'sha256'}),
|
||
|
'cert_hash': hashlib.sha256(cert.dump()).digest()
|
||
|
}),)
|
||
|
},),
|
||
|
}),
|
||
|
cms.CMSAttribute({
|
||
|
'type': cms.CMSAttributeType('signing_time'),
|
||
|
'values': (
|
||
|
cms.Time({'utc_time': core.UTCTime(signing_time.replace(tzinfo=pytz.UTC))}),)
|
||
|
}),
|
||
|
])
|
||
|
|
||
|
def _generate_signer_info(self, eta_invoice, signing_time, signature=False):
|
||
|
cert = x509.Certificate.load(base64.b64decode(self.certificate))
|
||
|
signer_info = {
|
||
|
'version': 'v1',
|
||
|
'sid': cms.SignerIdentifier({
|
||
|
'issuer_and_serial_number': cms.IssuerAndSerialNumber({
|
||
|
'issuer': cert.issuer,
|
||
|
'serial_number': cert.serial_number,
|
||
|
}),
|
||
|
}),
|
||
|
'digest_algorithm': algos.DigestAlgorithm({'algorithm': 'sha256'}),
|
||
|
'signature_algorithm': algos.SignedDigestAlgorithm({
|
||
|
'algorithm': 'sha256_rsa'
|
||
|
}),
|
||
|
'signed_attrs': self._generate_signed_attrs(eta_invoice, signing_time)
|
||
|
}
|
||
|
if signature:
|
||
|
signer_info['signature'] = signature
|
||
|
return signer_info
|
||
|
|
||
|
def _generate_cades_bes_signature(self, eta_invoice, signing_time, signature):
|
||
|
cert = x509.Certificate.load(base64.b64decode(self.certificate))
|
||
|
signed_data = {
|
||
|
'version': 'v3',
|
||
|
'digest_algorithms': cms.DigestAlgorithms((
|
||
|
algos.DigestAlgorithm({'algorithm': 'sha256'}),
|
||
|
)),
|
||
|
'encap_content_info': {
|
||
|
'content_type': 'digested_data',
|
||
|
},
|
||
|
'certificates': [cert],
|
||
|
'signer_infos': [
|
||
|
self._generate_signer_info(eta_invoice, signing_time, signature),
|
||
|
],
|
||
|
}
|
||
|
content_info = cms.ContentInfo({'content_type': cms.ContentType('signed_data'), 'content': cms.SignedData(signed_data)})
|
||
|
return base64.b64encode(content_info.dump()).decode()
|