Odoo18-Base/addons/certificate/models/key.py
2025-01-06 10:57:38 +07:00

280 lines
10 KiB
Python

import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.hazmat.primitives.serialization import Encoding
from odoo import _, api, fields, models
from odoo.exceptions import UserError
STR_TO_HASH = {
'sha1': hashes.SHA1(),
'sha256': hashes.SHA256(),
}
STR_TO_CURVE = {
'SECP256R1': ec.SECP256R1(),
}
def _get_formatted_value(data, formatting='encodebytes'):
if formatting == 'encodebytes':
return base64.encodebytes(data)
elif formatting == 'base64':
return base64.b64encode(data)
else:
return data
def _int_to_bytes(value, byteorder='big'):
return value.to_bytes((value.bit_length() + 7) // 8, byteorder=byteorder)
class Key(models.Model):
_name = 'certificate.key'
_description = 'Cryptographic Keys'
name = fields.Char(string='Name', default="New key")
content = fields.Binary(string='Key file', required=True)
password = fields.Char(string='Private key password')
pem_key = fields.Binary(
string='Key bytes in PEM format',
compute='_compute_pem_key',
store=True,
)
public = fields.Boolean(
string='Public/Private key',
compute='_compute_pem_key',
store=True,
)
loading_error = fields.Text(
string='Loading error',
compute='_compute_pem_key',
store=True,
)
active = fields.Boolean(name='Active', help='Set active to false to archive the key.', default=True)
company_id = fields.Many2one(
comodel_name='res.company',
string='Company',
required=True,
default=lambda self: self.env.company,
ondelete='cascade',
)
@api.depends('content', 'password')
def _compute_pem_key(self):
for key in self:
content = key.with_context(bin_size=False).content
if not content:
key.pem_key = None
key.public = None
key.loading_error = ""
else:
pkey_content = base64.b64decode(content)
pkey_password = key.password.encode('utf-8') if key.password else None
# Try to load the key in different format starting with DER then PEM for private then public keys.
# If none succeeded, we report an error.
pkey = None
try:
pkey = serialization.load_der_private_key(pkey_content, pkey_password)
key.public = False
except (ValueError, TypeError):
pass
if not pkey:
try:
pkey = serialization.load_pem_private_key(pkey_content, pkey_password)
key.public = False
except (ValueError, TypeError):
pass
if not pkey:
try:
pkey = serialization.load_der_public_key(pkey_content)
key.public = True
except (ValueError, TypeError):
pass
if not pkey:
try:
pkey = serialization.load_pem_public_key(pkey_content)
key.public = True
except (ValueError, TypeError):
pass
if not pkey:
key.pem_key = None
key.public = None
key.loading_error = _("This key could not be loaded. Either its content or its password is erroneous.")
continue
if key.public:
key.pem_key = base64.b64encode(pkey.public_bytes(
encoding=Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
))
else:
key.pem_key = base64.b64encode(pkey.private_bytes(
encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
key.loading_error = ""
# -------------------------------------------------------
# Business Methods #
# -------------------------------------------------------
def _sign(self, message, hashing_algorithm='sha256', formatting='encodebytes'):
""" Return the base64 encoded signature of message. """
self.ensure_one()
if self.public:
raise UserError(_("Make sure to use a private key to sign documents."))
return self._sign_with_key(
message,
self.with_context(bin_size=False).pem_key,
pwd=None,
hashing_algorithm=hashing_algorithm,
formatting=formatting
)
def _get_public_key_numbers_bytes(self, formatting='encodebytes'):
self.ensure_one()
return self._numbers_public_key_bytes_with_key(
self._get_public_key_bytes(encoding='PEM'),
formatting=formatting,
)
def _get_public_key_bytes(self, encoding='der', formatting='encodebytes'):
self.ensure_one()
if self.public:
public_key = serialization.load_pem_public_key(base64.b64decode(self.with_context(bin_size=False).pem_key))
else:
public_key = serialization.load_pem_private_key(base64.b64decode(self.with_context(bin_size=False).pem_key), None).public_key()
encoding = serialization.Encoding.DER if encoding == 'der' else serialization.Encoding.PEM
return _get_formatted_value(
public_key.public_bytes(
encoding=encoding,
format=serialization.PublicFormat.SubjectPublicKeyInfo
),
formatting=formatting,
)
def _decrypt(self, message, hashing_algorithm='sha256'):
self.ensure_one()
if not isinstance(message, bytes):
message = message.encode('utf-8')
if self.public:
raise UserError(_("A private key is required to decrypt data."))
if hashing_algorithm not in STR_TO_HASH:
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
private_key = serialization.load_pem_private_key(base64.b64decode(self.pem_key), None)
if not isinstance(private_key, rsa.RSAPrivateKey):
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported for decryption: RSA.", type(private_key)))
return private_key.decrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=STR_TO_HASH[hashing_algorithm]),
algorithm=STR_TO_HASH[hashing_algorithm],
label=None
)
)
@api.model
def _sign_with_key(self, message, pem_key, pwd=None, hashing_algorithm='sha256', formatting='encodebytes'):
""" Return the base64 encoded signature of message. """
if not isinstance(message, bytes):
message = message.encode('utf-8')
if hashing_algorithm not in STR_TO_HASH:
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
try:
private_key = serialization.load_pem_private_key(base64.b64decode(pem_key), pwd)
except ValueError:
raise UserError(_("The private key could not be loaded."))
if isinstance(private_key, ec.EllipticCurvePrivateKey):
signature = private_key.sign(
message,
ec.ECDSA(STR_TO_HASH[hashing_algorithm])
)
elif isinstance(private_key, rsa.RSAPrivateKey):
signature = private_key.sign(
message,
padding.PKCS1v15(),
STR_TO_HASH[hashing_algorithm]
)
else:
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported for signature: EC and RSA.", type(private_key)))
return _get_formatted_value(signature, formatting=formatting)
@api.model
def _numbers_public_key_bytes_with_key(self, pem_key, formatting='encodebytes'):
try:
public_key = serialization.load_pem_public_key(base64.b64decode(pem_key))
except ValueError:
raise UserError(_("The public key could not be loaded."))
if isinstance(public_key, ec.EllipticCurvePublicKey):
e = public_key.public_numbers().x
n = public_key.public_numbers().y
elif isinstance(public_key, rsa.RSAPublicKey):
e = public_key.public_numbers().e
n = public_key.public_numbers().n
else:
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported: EC, RSA.", type(public_key)))
return (
_get_formatted_value(_int_to_bytes(e), formatting=formatting),
_get_formatted_value(_int_to_bytes(n), formatting=formatting)
)
@api.model
def _generate_ec_private_key(self, company, name='id_ec', curve='SECP256R1'):
if curve not in STR_TO_CURVE:
raise UserError(f"Unsupported curve algorithm '{curve}'. Currently supported: SECP256R1.")
private_key = ec.generate_private_key(STR_TO_CURVE[curve])
return self.env['certificate.key'].create({
'name': name,
'content': base64.b64encode(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())),
'company_id': company.id,
})
@api.model
def _generate_rsa_private_key(self, company, name='id_rsa', public_exponent=65537, key_size=2048):
private_key = rsa.generate_private_key(
public_exponent=public_exponent,
key_size=key_size
)
return self.env['certificate.key'].create({
'name': name,
'content': base64.b64encode(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())),
'company_id': company.id,
})