Odoo18-Base/addons/certificate/models/certificate.py
2025-01-06 10:57:38 +07:00

330 lines
15 KiB
Python

import base64
import datetime
from importlib import metadata
from cryptography import x509
from cryptography.hazmat.primitives import constant_time, serialization
from cryptography.hazmat.primitives.serialization import Encoding, pkcs12
from odoo import _, api, fields, models
from .key import STR_TO_HASH, _get_formatted_value
from odoo.exceptions import UserError
from odoo.osv import expression
from odoo.tools import parse_version
class Certificate(models.Model):
_name = 'certificate.certificate'
_description = 'Certificate'
_order = 'date_end DESC'
_check_company_auto = True
name = fields.Char(string='Name')
content = fields.Binary(string='Certificate', readonly=False, required=True)
pkcs12_password = fields.Char(string='Certificate Password', help='Password to decrypt the PKS file.')
private_key_id = fields.Many2one(
string='Private Key',
comodel_name='certificate.key',
check_company=True,
domain=[('public', '=', False)],
compute='_compute_private_key',
store=True,
readonly=False,
)
public_key_id = fields.Many2one(
string='Public Key',
comodel_name='certificate.key',
check_company=True,
domain=[('public', '=', True)],
help="""Used to set a public key in case the one self-contained in the certificate is erroneus.
When a public key is set this way, it will be used instead of the one in the certificate.
""",
)
scope = fields.Selection(
string="Certificate scope",
selection=[
('general', 'General'),
],
)
content_format = fields.Selection(
selection=[
('der', 'DER'),
('pem', 'PEM'),
('pkcs12', 'PKCS12'),
],
string='Original certificate format',
compute='_compute_pem_certificate',
store=True,
)
pem_certificate = fields.Binary(
string='Certificate in PEM format',
compute='_compute_pem_certificate',
store=True,
)
subject_common_name = fields.Char(
string='Subject Name',
compute='_compute_pem_certificate',
store=True,
)
serial_number = fields.Char(
string='Serial number',
help='The serial number to add to electronic documents',
compute='_compute_pem_certificate',
store=True,
)
date_start = fields.Datetime(
string='Available date',
help='The date on which the certificate starts to be valid (UTC)',
compute='_compute_pem_certificate',
store=True,
)
date_end = fields.Datetime(
string='Expiration date',
help='The date on which the certificate expires (UTC)',
compute='_compute_pem_certificate',
store=True,
)
loading_error = fields.Text(string='Loading error', compute='_compute_pem_certificate', store=True)
is_valid = fields.Boolean(string='Valid', compute='_compute_is_valid', search='_search_is_valid')
active = fields.Boolean(name='Active', help='Set active to false to archive the certificate', default=True)
company_id = fields.Many2one(
comodel_name='res.company',
string='Company',
required=True,
default=lambda self: self.env.company,
ondelete='cascade',
)
country_code = fields.Char(related='company_id.country_code', depends=['company_id'])
@api.depends('pem_certificate')
def _compute_private_key(self):
attachments = self.env['ir.attachment'].search([
('res_model', '=', 'certificate.key'),
('res_field', '=', 'content'),
('res_id', 'in', self.ids)
])
content_to_key_id = {(att.datas, att.company_id.id): att.res_id for att in attachments}
for certificate in self:
if not certificate.pem_certificate:
certificate.private_key_id = None
continue
if certificate.private_key_id:
continue
# Create the private key in case of PKCS12 File and no private key is set
if certificate.content_format == 'pkcs12':
content = certificate.with_context(bin_size=False).content
pkcs12_password = certificate.pkcs12_password.encode('utf-8') if certificate.pkcs12_password else None
key, _cert, _additional_certs = pkcs12.load_key_and_certificates(base64.b64decode(content), pkcs12_password)
if key:
pem_key = base64.b64encode(key.private_bytes(
encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
key_id = content_to_key_id.get((pem_key, certificate.company_id.id))
if not key_id:
key_id = self.env['certificate.key'].create({
'name': (certificate.subject_common_name or certificate.name or "") + ".key",
'content': pem_key,
'company_id': certificate.company_id.id,
})
certificate.private_key_id = key_id
@api.depends('content', 'pkcs12_password')
def _compute_pem_certificate(self):
for certificate in self:
content = certificate.with_context(bin_size=False).content
if not content:
certificate.pem_certificate = None
certificate.subject_common_name = None
certificate.content_format = None
certificate.date_start = None
certificate.date_end = None
certificate.serial_number = None
certificate.loading_error = ""
else:
content = base64.b64decode(content)
cert = None
# Try to load the certificate in different format starting with DER then PKCS12 and
# finally PEM. If none succeeded, we report an error.
try:
cert = x509.load_der_x509_certificate(content)
certificate.content_format = 'der'
except ValueError:
pass
if not cert:
try:
pkcs12_password = certificate.pkcs12_password.encode('utf-8') if certificate.pkcs12_password else None
_key, cert, _additional_certs = pkcs12.load_key_and_certificates(content, pkcs12_password)
certificate.content_format = 'pkcs12'
except ValueError:
pass
if not cert:
try:
cert = x509.load_pem_x509_certificate(content)
certificate.content_format = 'pem'
except ValueError:
pass
if not cert:
certificate.pem_certificate = None
certificate.subject_common_name = None
certificate.content_format = None
certificate.date_start = None
certificate.date_end = None
certificate.serial_number = None
certificate.loading_error = _("This certificate could not be loaded. Either the content or the password is erroneous.")
continue
try:
common_name = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
certificate.subject_common_name = common_name[0].value if common_name else ""
except ValueError:
certificate.subject_common_name = None
certificate.loading_error = ""
# Extract certificate data
certificate.pem_certificate = base64.b64encode(cert.public_bytes(Encoding.PEM))
certificate.serial_number = cert.serial_number
if parse_version(metadata.version('cryptography')) < parse_version('42.0.0'):
certificate.date_start = cert.not_valid_before
certificate.date_end = cert.not_valid_after
else:
certificate.date_start = cert.not_valid_before_utc.replace(tzinfo=None)
certificate.date_end = cert.not_valid_after_utc.replace(tzinfo=None)
@api.depends('date_start', 'date_end', 'loading_error')
def _compute_is_valid(self):
# Certificate dates are UTC timezoned
# https://cryptography.io/en/latest/x509/reference/#cryptography.x509.Certificate.not_valid_after
utc_now = datetime.datetime.now(datetime.timezone.utc)
for certificate in self:
if not certificate.date_start or not certificate.date_end or certificate.loading_error:
certificate.is_valid = False
else:
date_start = certificate.date_start.replace(tzinfo=datetime.timezone.utc)
date_end = certificate.date_end.replace(tzinfo=datetime.timezone.utc)
certificate.is_valid = date_start <= utc_now <= date_end
def _search_is_valid(self, operator, value):
if operator not in ['=', '!='] or not isinstance(value, bool):
raise NotImplementedError("Operation not supported, only '=' and '!=' are allowed.")
utc_now = datetime.datetime.now(datetime.timezone.utc)
if (operator == '=' and value) or (operator == '!=' and not value):
return [
('pem_certificate', '!=', False),
('date_start', '<=', utc_now),
('date_end', '>=', utc_now),
('loading_error', '=', '')
]
else:
return expression.OR([
[('pem_certificate', '=', False)],
[('date_start', '=', False)],
[('date_end', '=', False)],
[('date_start', '>', utc_now)],
[('date_end', '<', utc_now)],
[('loading_error', '!=', '')],
])
@api.constrains('pem_certificate', 'private_key_id', 'public_key_id')
def _constrains_certificate_key_compatibility(self):
for certificate in self:
pem_certificate = certificate.with_context(bin_size=False).pem_certificate
if pem_certificate:
cert = x509.load_pem_x509_certificate(base64.b64decode(pem_certificate))
cert_public_key_bytes = cert.public_key().public_bytes(
encoding=Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
if certificate.private_key_id:
if certificate.private_key_id.loading_error:
raise UserError(certificate.private_key_id.loading_error)
pkey_public_key_bytes = base64.b64decode(
certificate.private_key_id._get_public_key_bytes(encoding='pem')
)
if not constant_time.bytes_eq(pkey_public_key_bytes, cert_public_key_bytes):
raise UserError(_("The certificate and private key are not compatible."))
if certificate.public_key_id:
if certificate.public_key_id.loading_error:
raise UserError(certificate.public_key_id.loading_error)
pkey_public_key_bytes = base64.b64decode(
certificate.public_key_id._get_public_key_bytes(encoding='pem')
)
if not constant_time.bytes_eq(pkey_public_key_bytes, cert_public_key_bytes):
raise UserError(_("The certificate and public key are not compatible."))
# -------------------------------------------------------
# Business Methods #
# -------------------------------------------------------
def _get_der_certificate_bytes(self, formatting='encodebytes'):
self.ensure_one()
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
return _get_formatted_value(cert.public_bytes(serialization.Encoding.DER), formatting=formatting)
def _get_fingerprint_bytes(self, hashing_algorithm='sha256', formatting='encodebytes'):
self.ensure_one()
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
if hashing_algorithm not in STR_TO_HASH:
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
return _get_formatted_value(cert.fingerprint(STR_TO_HASH[hashing_algorithm]), formatting=formatting)
def _get_signature_bytes(self, formatting='encodebytes'):
self.ensure_one()
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
return _get_formatted_value(cert.signature, formatting=formatting)
def _get_public_key_numbers_bytes(self, formatting='encodebytes'):
self.ensure_one()
if self.public_key_id or self.private_key_id:
return (self.public_key_id or self.private_key_id)._get_public_key_numbers_bytes(formatting=formatting)
# When no keys are set to the certificate, use the self-contained public key from the content
return self.env['certificate.key']._numbers_public_key_bytes_with_key(
self._get_public_key_bytes(encoding='pem'),
formatting=formatting,
)
def _get_public_key_bytes(self, encoding='der', formatting='encodebytes'):
self.ensure_one()
if self.public_key_id or self.private_key_id:
return (self.public_key_id or self.private_key_id)._get_public_key_bytes(encoding=encoding, formatting=formatting)
# When no keys are set to the certificate, use the self-contained public key from the content
try:
cert = x509.load_pem_x509_certificate(base64.b64decode(self.with_context(bin_size=False).pem_certificate))
public_key = cert.public_key()
except ValueError:
raise UserError(_("The public key from the certificate could not be loaded."))
encoding = serialization.Encoding.DER if encoding == 'der' else serialization.Encoding.PEM
return _get_formatted_value(
public_key.public_bytes(
encoding=encoding,
format=serialization.PublicFormat.SubjectPublicKeyInfo
),
formatting=formatting,
)
def _sign(self, message, hashing_algorithm='sha256', formatting='encodebytes'):
""" Return the base64 encoded signature of message. """
self.ensure_one()
if not self.is_valid:
raise UserError(self.loading_error or _("This certificate is not valid, its validity has expired."))
if not self.private_key_id:
raise UserError(_("No private key linked to the certificate, it is required to sign documents."))
return self.private_key_id._sign(message, hashing_algorithm=hashing_algorithm, formatting=formatting)