import base64 import datetime from importlib import metadata from cryptography import x509 from cryptography.hazmat.primitives import constant_time, serialization from cryptography.hazmat.primitives.serialization import Encoding, pkcs12 from odoo import _, api, fields, models from .key import STR_TO_HASH, _get_formatted_value from odoo.exceptions import UserError from odoo.osv import expression from odoo.tools import parse_version class Certificate(models.Model): _name = 'certificate.certificate' _description = 'Certificate' _order = 'date_end DESC' _check_company_auto = True name = fields.Char(string='Name') content = fields.Binary(string='Certificate', readonly=False, required=True) pkcs12_password = fields.Char(string='Certificate Password', help='Password to decrypt the PKS file.') private_key_id = fields.Many2one( string='Private Key', comodel_name='certificate.key', check_company=True, domain=[('public', '=', False)], compute='_compute_private_key', store=True, readonly=False, ) public_key_id = fields.Many2one( string='Public Key', comodel_name='certificate.key', check_company=True, domain=[('public', '=', True)], help="""Used to set a public key in case the one self-contained in the certificate is erroneus. When a public key is set this way, it will be used instead of the one in the certificate. """, ) scope = fields.Selection( string="Certificate scope", selection=[ ('general', 'General'), ], ) content_format = fields.Selection( selection=[ ('der', 'DER'), ('pem', 'PEM'), ('pkcs12', 'PKCS12'), ], string='Original certificate format', compute='_compute_pem_certificate', store=True, ) pem_certificate = fields.Binary( string='Certificate in PEM format', compute='_compute_pem_certificate', store=True, ) subject_common_name = fields.Char( string='Subject Name', compute='_compute_pem_certificate', store=True, ) serial_number = fields.Char( string='Serial number', help='The serial number to add to electronic documents', compute='_compute_pem_certificate', store=True, ) date_start = fields.Datetime( string='Available date', help='The date on which the certificate starts to be valid (UTC)', compute='_compute_pem_certificate', store=True, ) date_end = fields.Datetime( string='Expiration date', help='The date on which the certificate expires (UTC)', compute='_compute_pem_certificate', store=True, ) loading_error = fields.Text(string='Loading error', compute='_compute_pem_certificate', store=True) is_valid = fields.Boolean(string='Valid', compute='_compute_is_valid', search='_search_is_valid') active = fields.Boolean(name='Active', help='Set active to false to archive the certificate', default=True) company_id = fields.Many2one( comodel_name='res.company', string='Company', required=True, default=lambda self: self.env.company, ondelete='cascade', ) country_code = fields.Char(related='company_id.country_code', depends=['company_id']) @api.depends('pem_certificate') def _compute_private_key(self): attachments = self.env['ir.attachment'].search([ ('res_model', '=', 'certificate.key'), ('res_field', '=', 'content'), ('res_id', 'in', self.ids) ]) content_to_key_id = {(att.datas, att.company_id.id): att.res_id for att in attachments} for certificate in self: if not certificate.pem_certificate: certificate.private_key_id = None continue if certificate.private_key_id: continue # Create the private key in case of PKCS12 File and no private key is set if certificate.content_format == 'pkcs12': content = certificate.with_context(bin_size=False).content pkcs12_password = certificate.pkcs12_password.encode('utf-8') if certificate.pkcs12_password else None key, _cert, _additional_certs = pkcs12.load_key_and_certificates(base64.b64decode(content), pkcs12_password) if key: pem_key = base64.b64encode(key.private_bytes( encoding=Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) key_id = content_to_key_id.get((pem_key, certificate.company_id.id)) if not key_id: key_id = self.env['certificate.key'].create({ 'name': (certificate.subject_common_name or certificate.name or "") + ".key", 'content': pem_key, 'company_id': certificate.company_id.id, }) certificate.private_key_id = key_id @api.depends('content', 'pkcs12_password') def _compute_pem_certificate(self): for certificate in self: content = certificate.with_context(bin_size=False).content if not content: certificate.pem_certificate = None certificate.subject_common_name = None certificate.content_format = None certificate.date_start = None certificate.date_end = None certificate.serial_number = None certificate.loading_error = "" else: content = base64.b64decode(content) cert = None # Try to load the certificate in different format starting with DER then PKCS12 and # finally PEM. If none succeeded, we report an error. try: cert = x509.load_der_x509_certificate(content) certificate.content_format = 'der' except ValueError: pass if not cert: try: pkcs12_password = certificate.pkcs12_password.encode('utf-8') if certificate.pkcs12_password else None _key, cert, _additional_certs = pkcs12.load_key_and_certificates(content, pkcs12_password) certificate.content_format = 'pkcs12' except ValueError: pass if not cert: try: cert = x509.load_pem_x509_certificate(content) certificate.content_format = 'pem' except ValueError: pass if not cert: certificate.pem_certificate = None certificate.subject_common_name = None certificate.content_format = None certificate.date_start = None certificate.date_end = None certificate.serial_number = None certificate.loading_error = _("This certificate could not be loaded. Either the content or the password is erroneous.") continue try: common_name = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) certificate.subject_common_name = common_name[0].value if common_name else "" except ValueError: certificate.subject_common_name = None certificate.loading_error = "" # Extract certificate data certificate.pem_certificate = base64.b64encode(cert.public_bytes(Encoding.PEM)) certificate.serial_number = cert.serial_number if parse_version(metadata.version('cryptography')) < parse_version('42.0.0'): certificate.date_start = cert.not_valid_before certificate.date_end = cert.not_valid_after else: certificate.date_start = cert.not_valid_before_utc.replace(tzinfo=None) certificate.date_end = cert.not_valid_after_utc.replace(tzinfo=None) @api.depends('date_start', 'date_end', 'loading_error') def _compute_is_valid(self): # Certificate dates are UTC timezoned # https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Certificate.not_valid_after utc_now = datetime.datetime.now(datetime.timezone.utc) for certificate in self: if not certificate.date_start or not certificate.date_end or certificate.loading_error: certificate.is_valid = False else: date_start = certificate.date_start.replace(tzinfo=datetime.timezone.utc) date_end = certificate.date_end.replace(tzinfo=datetime.timezone.utc) certificate.is_valid = date_start <= utc_now <= date_end def _search_is_valid(self, operator, value): if operator not in ['=', '!='] or not isinstance(value, bool): raise NotImplementedError("Operation not supported, only '=' and '!=' are allowed.") utc_now = datetime.datetime.now(datetime.timezone.utc) if (operator == '=' and value) or (operator == '!=' and not value): return [ ('pem_certificate', '!=', False), ('date_start', '<=', utc_now), ('date_end', '>=', utc_now), ('loading_error', '=', '') ] else: return expression.OR([ [('pem_certificate', '=', False)], [('date_start', '=', False)], [('date_end', '=', False)], [('date_start', '>', utc_now)], [('date_end', '<', utc_now)], [('loading_error', '!=', '')], ]) @api.constrains('pem_certificate', 'private_key_id', 'public_key_id') def _constrains_certificate_key_compatibility(self): for certificate in self: pem_certificate = certificate.with_context(bin_size=False).pem_certificate if pem_certificate: cert = x509.load_pem_x509_certificate(base64.b64decode(pem_certificate)) cert_public_key_bytes = cert.public_key().public_bytes( encoding=Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) if certificate.private_key_id: if certificate.private_key_id.loading_error: raise UserError(certificate.private_key_id.loading_error) pkey_public_key_bytes = base64.b64decode( certificate.private_key_id._get_public_key_bytes(encoding='pem') ) if not constant_time.bytes_eq(pkey_public_key_bytes, cert_public_key_bytes): raise UserError(_("The certificate and private key are not compatible.")) if certificate.public_key_id: if certificate.public_key_id.loading_error: raise UserError(certificate.public_key_id.loading_error) pkey_public_key_bytes = base64.b64decode( certificate.public_key_id._get_public_key_bytes(encoding='pem') ) if not constant_time.bytes_eq(pkey_public_key_bytes, cert_public_key_bytes): raise UserError(_("The certificate and public key are not compatible.")) # ------------------------------------------------------- # Business Methods # # ------------------------------------------------------- def _get_der_certificate_bytes(self, formatting='encodebytes'): self.ensure_one() cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate)) return _get_formatted_value(cert.public_bytes(serialization.Encoding.DER), formatting=formatting) def _get_fingerprint_bytes(self, hashing_algorithm='sha256', formatting='encodebytes'): self.ensure_one() cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate)) if hashing_algorithm not in STR_TO_HASH: raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.") return _get_formatted_value(cert.fingerprint(STR_TO_HASH[hashing_algorithm]), formatting=formatting) def _get_signature_bytes(self, formatting='encodebytes'): self.ensure_one() cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate)) return _get_formatted_value(cert.signature, formatting=formatting) def _get_public_key_numbers_bytes(self, formatting='encodebytes'): self.ensure_one() if self.public_key_id or self.private_key_id: return (self.public_key_id or self.private_key_id)._get_public_key_numbers_bytes(formatting=formatting) # When no keys are set to the certificate, use the self-contained public key from the content return self.env['certificate.key']._numbers_public_key_bytes_with_key( self._get_public_key_bytes(encoding='pem'), formatting=formatting, ) def _get_public_key_bytes(self, encoding='der', formatting='encodebytes'): self.ensure_one() if self.public_key_id or self.private_key_id: return (self.public_key_id or self.private_key_id)._get_public_key_bytes(encoding=encoding, formatting=formatting) # When no keys are set to the certificate, use the self-contained public key from the content try: cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate)) public_key = cert.public_key() except ValueError: raise UserError(_("The public key from the certificate could not be loaded.")) encoding = serialization.Encoding.DER if encoding == 'der' else serialization.Encoding.PEM return _get_formatted_value( public_key.public_bytes( encoding=encoding, format=serialization.PublicFormat.SubjectPublicKeyInfo ), formatting=formatting, ) def _sign(self, message, hashing_algorithm='sha256', formatting='encodebytes'): """ Return the base64 encoded signature of message. """ self.ensure_one() if not self.is_valid: raise UserError(self.loading_error or _("This certificate is not valid, its validity has expired.")) if not self.private_key_id: raise UserError(_("No private key linked to the certificate, it is required to sign documents.")) return self.private_key_id._sign(message, hashing_algorithm=hashing_algorithm, formatting=formatting)