280 lines
10 KiB
Python
280 lines
10 KiB
Python
import base64
|
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
|
|
from cryptography.hazmat.primitives.serialization import Encoding
|
|
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import UserError
|
|
|
|
|
|
STR_TO_HASH = {
|
|
'sha1': hashes.SHA1(),
|
|
'sha256': hashes.SHA256(),
|
|
}
|
|
|
|
STR_TO_CURVE = {
|
|
'SECP256R1': ec.SECP256R1(),
|
|
}
|
|
|
|
|
|
def _get_formatted_value(data, formatting='encodebytes'):
|
|
if formatting == 'encodebytes':
|
|
return base64.encodebytes(data)
|
|
elif formatting == 'base64':
|
|
return base64.b64encode(data)
|
|
else:
|
|
return data
|
|
|
|
|
|
def _int_to_bytes(value, byteorder='big'):
|
|
return value.to_bytes((value.bit_length() + 7) // 8, byteorder=byteorder)
|
|
|
|
|
|
class Key(models.Model):
|
|
_name = 'certificate.key'
|
|
_description = 'Cryptographic Keys'
|
|
|
|
name = fields.Char(string='Name', default="New key")
|
|
content = fields.Binary(string='Key file', required=True)
|
|
password = fields.Char(string='Private key password')
|
|
pem_key = fields.Binary(
|
|
string='Key bytes in PEM format',
|
|
compute='_compute_pem_key',
|
|
store=True,
|
|
)
|
|
public = fields.Boolean(
|
|
string='Public/Private key',
|
|
compute='_compute_pem_key',
|
|
store=True,
|
|
)
|
|
loading_error = fields.Text(
|
|
string='Loading error',
|
|
compute='_compute_pem_key',
|
|
store=True,
|
|
)
|
|
active = fields.Boolean(name='Active', help='Set active to false to archive the key.', default=True)
|
|
company_id = fields.Many2one(
|
|
comodel_name='res.company',
|
|
string='Company',
|
|
required=True,
|
|
default=lambda self: self.env.company,
|
|
ondelete='cascade',
|
|
)
|
|
|
|
@api.depends('content', 'password')
|
|
def _compute_pem_key(self):
|
|
for key in self:
|
|
content = key.with_context(bin_size=False).content
|
|
if not content:
|
|
key.pem_key = None
|
|
key.public = None
|
|
key.loading_error = ""
|
|
else:
|
|
pkey_content = base64.b64decode(content)
|
|
pkey_password = key.password.encode('utf-8') if key.password else None
|
|
|
|
# Try to load the key in different format starting with DER then PEM for private then public keys.
|
|
# If none succeeded, we report an error.
|
|
pkey = None
|
|
try:
|
|
pkey = serialization.load_der_private_key(pkey_content, pkey_password)
|
|
key.public = False
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if not pkey:
|
|
try:
|
|
pkey = serialization.load_pem_private_key(pkey_content, pkey_password)
|
|
key.public = False
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if not pkey:
|
|
try:
|
|
pkey = serialization.load_der_public_key(pkey_content)
|
|
key.public = True
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if not pkey:
|
|
try:
|
|
pkey = serialization.load_pem_public_key(pkey_content)
|
|
key.public = True
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if not pkey:
|
|
key.pem_key = None
|
|
key.public = None
|
|
key.loading_error = _("This key could not be loaded. Either its content or its password is erroneous.")
|
|
continue
|
|
|
|
if key.public:
|
|
key.pem_key = base64.b64encode(pkey.public_bytes(
|
|
encoding=Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
))
|
|
else:
|
|
key.pem_key = base64.b64encode(pkey.private_bytes(
|
|
encoding=Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
))
|
|
|
|
key.loading_error = ""
|
|
|
|
# -------------------------------------------------------
|
|
# Business Methods #
|
|
# -------------------------------------------------------
|
|
|
|
def _sign(self, message, hashing_algorithm='sha256', formatting='encodebytes'):
|
|
""" Return the base64 encoded signature of message. """
|
|
self.ensure_one()
|
|
|
|
if self.public:
|
|
raise UserError(_("Make sure to use a private key to sign documents."))
|
|
|
|
return self._sign_with_key(
|
|
message,
|
|
self.with_context(bin_size=False).pem_key,
|
|
pwd=None,
|
|
hashing_algorithm=hashing_algorithm,
|
|
formatting=formatting
|
|
)
|
|
|
|
def _get_public_key_numbers_bytes(self, formatting='encodebytes'):
|
|
self.ensure_one()
|
|
|
|
return self._numbers_public_key_bytes_with_key(
|
|
self._get_public_key_bytes(encoding='PEM'),
|
|
formatting=formatting,
|
|
)
|
|
|
|
def _get_public_key_bytes(self, encoding='der', formatting='encodebytes'):
|
|
self.ensure_one()
|
|
|
|
if self.public:
|
|
public_key = serialization.load_pem_public_key(base64.b64decode(self.with_context(bin_size=False).pem_key))
|
|
else:
|
|
public_key = serialization.load_pem_private_key(base64.b64decode(self.with_context(bin_size=False).pem_key), None).public_key()
|
|
|
|
encoding = serialization.Encoding.DER if encoding == 'der' else serialization.Encoding.PEM
|
|
return _get_formatted_value(
|
|
public_key.public_bytes(
|
|
encoding=encoding,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
|
),
|
|
formatting=formatting,
|
|
)
|
|
|
|
def _decrypt(self, message, hashing_algorithm='sha256'):
|
|
self.ensure_one()
|
|
|
|
if not isinstance(message, bytes):
|
|
message = message.encode('utf-8')
|
|
|
|
if self.public:
|
|
raise UserError(_("A private key is required to decrypt data."))
|
|
if hashing_algorithm not in STR_TO_HASH:
|
|
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
|
|
|
|
private_key = serialization.load_pem_private_key(base64.b64decode(self.pem_key), None)
|
|
if not isinstance(private_key, rsa.RSAPrivateKey):
|
|
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported for decryption: RSA.", type(private_key)))
|
|
|
|
return private_key.decrypt(
|
|
message,
|
|
padding.OAEP(
|
|
mgf=padding.MGF1(algorithm=STR_TO_HASH[hashing_algorithm]),
|
|
algorithm=STR_TO_HASH[hashing_algorithm],
|
|
label=None
|
|
)
|
|
)
|
|
|
|
@api.model
|
|
def _sign_with_key(self, message, pem_key, pwd=None, hashing_algorithm='sha256', formatting='encodebytes'):
|
|
""" Return the base64 encoded signature of message. """
|
|
|
|
if not isinstance(message, bytes):
|
|
message = message.encode('utf-8')
|
|
|
|
if hashing_algorithm not in STR_TO_HASH:
|
|
raise UserError(f"Unsupported hashing algorithm '{hashing_algorithm}'. Currently supported: sha1 and sha256.")
|
|
|
|
try:
|
|
private_key = serialization.load_pem_private_key(base64.b64decode(pem_key), pwd)
|
|
except ValueError:
|
|
raise UserError(_("The private key could not be loaded."))
|
|
|
|
if isinstance(private_key, ec.EllipticCurvePrivateKey):
|
|
signature = private_key.sign(
|
|
message,
|
|
ec.ECDSA(STR_TO_HASH[hashing_algorithm])
|
|
)
|
|
elif isinstance(private_key, rsa.RSAPrivateKey):
|
|
signature = private_key.sign(
|
|
message,
|
|
padding.PKCS1v15(),
|
|
STR_TO_HASH[hashing_algorithm]
|
|
)
|
|
else:
|
|
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported for signature: EC and RSA.", type(private_key)))
|
|
|
|
return _get_formatted_value(signature, formatting=formatting)
|
|
|
|
@api.model
|
|
def _numbers_public_key_bytes_with_key(self, pem_key, formatting='encodebytes'):
|
|
try:
|
|
public_key = serialization.load_pem_public_key(base64.b64decode(pem_key))
|
|
except ValueError:
|
|
raise UserError(_("The public key could not be loaded."))
|
|
|
|
if isinstance(public_key, ec.EllipticCurvePublicKey):
|
|
e = public_key.public_numbers().x
|
|
n = public_key.public_numbers().y
|
|
elif isinstance(public_key, rsa.RSAPublicKey):
|
|
e = public_key.public_numbers().e
|
|
n = public_key.public_numbers().n
|
|
else:
|
|
raise UserError(_("Unsupported asymmetric cryptography algorithm '%s'. Currently supported: EC, RSA.", type(public_key)))
|
|
|
|
return (
|
|
_get_formatted_value(_int_to_bytes(e), formatting=formatting),
|
|
_get_formatted_value(_int_to_bytes(n), formatting=formatting)
|
|
)
|
|
|
|
@api.model
|
|
def _generate_ec_private_key(self, company, name='id_ec', curve='SECP256R1'):
|
|
|
|
if curve not in STR_TO_CURVE:
|
|
raise UserError(f"Unsupported curve algorithm '{curve}'. Currently supported: SECP256R1.")
|
|
|
|
private_key = ec.generate_private_key(STR_TO_CURVE[curve])
|
|
|
|
return self.env['certificate.key'].create({
|
|
'name': name,
|
|
'content': base64.b64encode(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption())),
|
|
'company_id': company.id,
|
|
})
|
|
|
|
@api.model
|
|
def _generate_rsa_private_key(self, company, name='id_rsa', public_exponent=65537, key_size=2048):
|
|
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=public_exponent,
|
|
key_size=key_size
|
|
)
|
|
|
|
return self.env['certificate.key'].create({
|
|
'name': name,
|
|
'content': base64.b64encode(private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.PKCS8,
|
|
encryption_algorithm=serialization.NoEncryption())),
|
|
'company_id': company.id,
|
|
})
|