190 lines
7.8 KiB
Python
190 lines
7.8 KiB
Python
|
import base64
|
||
|
import json
|
||
|
import logging
|
||
|
from werkzeug.urls import url_parse
|
||
|
|
||
|
from odoo import api, Command, fields, models, _
|
||
|
from odoo.exceptions import AccessDenied
|
||
|
from odoo.http import request
|
||
|
from odoo.tools import sql, SQL
|
||
|
|
||
|
from odoo.addons.base.models.res_users import check_identity
|
||
|
|
||
|
from .._vendor.webauthn import base64url_to_bytes, generate_authentication_options, generate_registration_options, options_to_json, verify_authentication_response, verify_registration_response
|
||
|
from .._vendor.webauthn.helpers import bytes_to_base64url
|
||
|
from .._vendor.webauthn.helpers.structs import AuthenticatorSelectionCriteria, ResidentKeyRequirement, UserVerificationRequirement
|
||
|
|
||
|
_logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class PassKey(models.Model):
|
||
|
_name = 'auth.passkey.key'
|
||
|
_description = 'Passkey'
|
||
|
_order = 'id desc'
|
||
|
|
||
|
name = fields.Char(required=True)
|
||
|
credential_identifier = fields.Char(required=True, groups='base.group_system')
|
||
|
public_key = fields.Char(required=True, groups='base.group_system', compute='_compute_public_key', inverse='_inverse_public_key')
|
||
|
sign_count = fields.Integer(default=0, groups='base.group_system')
|
||
|
|
||
|
_sql_constraints = [
|
||
|
('unique_identifier', 'UNIQUE(credential_identifier)', 'The credential identifier should be unique.'),
|
||
|
]
|
||
|
|
||
|
def init(self):
|
||
|
super().init()
|
||
|
if not sql.column_exists(self.env.cr, 'auth_passkey_key', 'public_key'):
|
||
|
self.env.cr.execute(SQL('ALTER TABLE auth_passkey_key ADD COLUMN public_key varchar'))
|
||
|
|
||
|
def unlink(self):
|
||
|
for passkey in self:
|
||
|
_logger.info(
|
||
|
"Passkey (#%d) deleted by %s (#%d) from %s",
|
||
|
passkey.id,
|
||
|
self.env.user.login, self.env.user.id,
|
||
|
request.httprequest.environ['REMOTE_ADDR'] if request else 'n/a'
|
||
|
)
|
||
|
return super().unlink()
|
||
|
|
||
|
def _compute_public_key(self):
|
||
|
query = 'SELECT public_key FROM auth_passkey_key WHERE id = %s'
|
||
|
for passkey in self:
|
||
|
self.env.cr.execute(SQL(query, passkey.id))
|
||
|
public_key = self.env.cr.fetchone()[0]
|
||
|
passkey.public_key = public_key
|
||
|
|
||
|
def _inverse_public_key(self):
|
||
|
pass
|
||
|
|
||
|
@api.model
|
||
|
def _get_session_challenge(self):
|
||
|
challenge = request.session.pop('webauthn_challenge', None)
|
||
|
if not challenge:
|
||
|
raise AccessDenied('Cannot find a challenge for this session')
|
||
|
return challenge
|
||
|
|
||
|
@api.model
|
||
|
def _start_auth(self):
|
||
|
assert request
|
||
|
authentication_options = json.loads(options_to_json(generate_authentication_options(
|
||
|
rp_id=url_parse(self.get_base_url()).host,
|
||
|
user_verification=UserVerificationRequirement.REQUIRED,
|
||
|
)))
|
||
|
request.session['webauthn_challenge'] = authentication_options['challenge']
|
||
|
return authentication_options
|
||
|
|
||
|
@api.model
|
||
|
def _verify_auth(self, auth, public_key, sign_count):
|
||
|
parsed_url = url_parse(self.get_base_url())
|
||
|
auth_verification = verify_authentication_response(
|
||
|
credential=auth,
|
||
|
expected_challenge=base64url_to_bytes(self._get_session_challenge()),
|
||
|
expected_origin=parsed_url.replace(path='').to_url(),
|
||
|
expected_rp_id=parsed_url.host,
|
||
|
credential_public_key=base64url_to_bytes(public_key),
|
||
|
credential_current_sign_count=sign_count,
|
||
|
require_user_verification=True,
|
||
|
)
|
||
|
return auth_verification.new_sign_count
|
||
|
|
||
|
@api.model
|
||
|
def _start_registration(self):
|
||
|
assert request
|
||
|
registration_options = json.loads(options_to_json(generate_registration_options(
|
||
|
rp_id=url_parse(self.get_base_url()).host,
|
||
|
rp_name='Odoo',
|
||
|
user_id=str(self.env.user.id).encode(),
|
||
|
user_name=self.env.user.login,
|
||
|
authenticator_selection=AuthenticatorSelectionCriteria(
|
||
|
resident_key=ResidentKeyRequirement.REQUIRED,
|
||
|
user_verification=UserVerificationRequirement.REQUIRED
|
||
|
)
|
||
|
)))
|
||
|
request.session['webauthn_challenge'] = registration_options['challenge']
|
||
|
return registration_options
|
||
|
|
||
|
@api.model
|
||
|
def _verify_registration_options(self, registration):
|
||
|
parsed_url = url_parse(self.get_base_url())
|
||
|
verification = verify_registration_response(
|
||
|
credential=registration,
|
||
|
expected_challenge=base64url_to_bytes(self._get_session_challenge()),
|
||
|
expected_origin=parsed_url.replace(path='').to_url(),
|
||
|
expected_rp_id=parsed_url.host,
|
||
|
require_user_verification=True,
|
||
|
)
|
||
|
return {
|
||
|
'credential_id': verification.credential_id,
|
||
|
'credential_public_key': verification.credential_public_key,
|
||
|
}
|
||
|
|
||
|
@check_identity
|
||
|
def action_delete_passkey(self):
|
||
|
for key in self:
|
||
|
if key.create_uid.id == self.env.user.id:
|
||
|
# Force to go through `res.users.auth_passkey_key_ids` to trigger the session token cache invalidation
|
||
|
# See `res.users.write` and `_get_invalidation_fields`
|
||
|
# `self.env.user` is already sudo, so no need to re-apply `sudo` to get delete access right.
|
||
|
self.env.user.write({'auth_passkey_key_ids': [Command.delete(key.id)]})
|
||
|
new_token = self.env.user._compute_session_token(request.session.sid)
|
||
|
request.session.session_token = new_token
|
||
|
else:
|
||
|
_logger.info(
|
||
|
"%s (#%d) attempted to delete passkey (#%d) belonging to %s (#%d) from %s but was denied.",
|
||
|
self.env.user.login, self.env.user.id,
|
||
|
key.id,
|
||
|
key.create_uid.login, key.create_uid.id,
|
||
|
request.httprequest.environ['REMOTE_ADDR'] if request else 'n/a'
|
||
|
)
|
||
|
|
||
|
def action_rename_passkey(self):
|
||
|
return {
|
||
|
'name': _('Rename Passkey'),
|
||
|
'type': 'ir.actions.act_window',
|
||
|
'res_model': 'auth.passkey.key',
|
||
|
'view_id': self.env.ref('auth_passkey.auth_passkey_key_rename').id,
|
||
|
'view_mode': 'form',
|
||
|
'target': 'new',
|
||
|
'res_id': self.id,
|
||
|
'context': {
|
||
|
'dialog_size': 'medium',
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
class PassKeyCreate(models.TransientModel):
|
||
|
_name = 'auth.passkey.key.create'
|
||
|
_description = 'Create a Passkey'
|
||
|
|
||
|
name = fields.Char('Name', required=True)
|
||
|
|
||
|
@check_identity
|
||
|
def make_key(self, registration=None):
|
||
|
# We add in these fields with JS, if we didn't give them default values we would get a XML validation warning.
|
||
|
assert registration, "registration can not be empty"
|
||
|
self.ensure_one()
|
||
|
verification = request.env['auth.passkey.key']._verify_registration_options(registration)
|
||
|
# Force to go through `res.users.auth_passkey_key_ids` to trigger the session token cache invalidation
|
||
|
# See `res.users.write` and `_get_invalidation_fields`
|
||
|
# `self.env.user` is already sudo, so no need to re-apply `sudo` to get create access right.
|
||
|
self.env.user.write({'auth_passkey_key_ids': [Command.create({
|
||
|
'name': self.name,
|
||
|
'credential_identifier': bytes_to_base64url(verification['credential_id']),
|
||
|
})]})
|
||
|
passkey = self.env.user.auth_passkey_key_ids[0]
|
||
|
self.env.cr.execute(SQL(
|
||
|
"UPDATE auth_passkey_key SET public_key = %s WHERE id = %s",
|
||
|
base64.urlsafe_b64encode(verification['credential_public_key']).decode(),
|
||
|
passkey.id,
|
||
|
))
|
||
|
ip = request.httprequest.environ['REMOTE_ADDR'] if request else 'n/a'
|
||
|
_logger.info(
|
||
|
"Passkey (#%d) created by %s (#%d) from %s",
|
||
|
passkey.id,
|
||
|
self.env.user.login, self.env.user.id,
|
||
|
ip
|
||
|
)
|
||
|
new_token = self.env.user._compute_session_token(request.session.sid)
|
||
|
request.session.session_token = new_token
|
||
|
return True
|