Odoo18-Base/addons/auth_oauth/models/res_users.py
2025-03-04 12:23:19 +07:00

156 lines
6.4 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
import requests
from werkzeug import http, datastructures
if hasattr(datastructures.WWWAuthenticate, "from_header"):
parse_auth = datastructures.WWWAuthenticate.from_header
else:
parse_auth = http.parse_www_authenticate_header
from odoo import api, fields, models
from odoo.exceptions import AccessDenied, UserError
from odoo.addons.auth_signup.models.res_users import SignupError
from odoo.addons import base
base.models.res_users.USER_PRIVATE_FIELDS.append('oauth_access_token')
class ResUsers(models.Model):
_inherit = 'res.users'
oauth_provider_id = fields.Many2one('auth.oauth.provider', string='OAuth Provider')
oauth_uid = fields.Char(string='OAuth User ID', help="Oauth Provider user_id", copy=False)
oauth_access_token = fields.Char(string='OAuth Access Token', readonly=True, copy=False, prefetch=False)
_sql_constraints = [
('uniq_users_oauth_provider_oauth_uid', 'unique(oauth_provider_id, oauth_uid)', 'OAuth UID must be unique per provider'),
]
def _auth_oauth_rpc(self, endpoint, access_token):
if self.env['ir.config_parameter'].sudo().get_param('auth_oauth.authorization_header'):
response = requests.get(endpoint, headers={'Authorization': 'Bearer %s' % access_token}, timeout=10)
else:
response = requests.get(endpoint, params={'access_token': access_token}, timeout=10)
if response.ok: # nb: could be a successful failure
return response.json()
auth_challenge = parse_auth(response.headers.get("WWW-Authenticate"))
if auth_challenge and auth_challenge.type == 'bearer' and 'error' in auth_challenge:
return dict(auth_challenge)
return {'error': 'invalid_request'}
@api.model
def _auth_oauth_validate(self, provider, access_token):
""" return the validation data corresponding to the access token """
oauth_provider = self.env['auth.oauth.provider'].browse(provider)
validation = self._auth_oauth_rpc(oauth_provider.validation_endpoint, access_token)
if validation.get("error"):
raise Exception(validation['error'])
if oauth_provider.data_endpoint:
data = self._auth_oauth_rpc(oauth_provider.data_endpoint, access_token)
validation.update(data)
# unify subject key, pop all possible and get most sensible. When this
# is reworked, BC should be dropped and only the `sub` key should be
# used (here, in _generate_signup_values, and in _auth_oauth_signin)
subject = next(filter(None, [
validation.pop(key, None)
for key in [
'sub', # standard
'id', # google v1 userinfo, facebook opengraph
'user_id', # google tokeninfo, odoo (tokeninfo)
]
]), None)
if not subject:
raise AccessDenied('Missing subject identity')
validation['user_id'] = subject
return validation
@api.model
def _generate_signup_values(self, provider, validation, params):
oauth_uid = validation['user_id']
email = validation.get('email', 'provider_%s_user_%s' % (provider, oauth_uid))
name = validation.get('name', email)
return {
'name': name,
'login': email,
'email': email,
'oauth_provider_id': provider,
'oauth_uid': oauth_uid,
'oauth_access_token': params['access_token'],
'active': True,
}
@api.model
def _auth_oauth_signin(self, provider, validation, params):
""" retrieve and sign in the user corresponding to provider and validated access token
:param provider: oauth provider id (int)
:param validation: result of validation of access token (dict)
:param params: oauth parameters (dict)
:return: user login (str)
:raise: AccessDenied if signin failed
This method can be overridden to add alternative signin methods.
"""
oauth_uid = validation['user_id']
try:
oauth_user = self.search([("oauth_uid", "=", oauth_uid), ('oauth_provider_id', '=', provider)])
if not oauth_user:
raise AccessDenied()
assert len(oauth_user) == 1
oauth_user.write({'oauth_access_token': params['access_token']})
return oauth_user.login
except AccessDenied as access_denied_exception:
if self.env.context.get('no_user_creation'):
return None
state = json.loads(params['state'])
token = state.get('t')
values = self._generate_signup_values(provider, validation, params)
try:
login, _ = self.signup(values, token)
return login
except (SignupError, UserError):
raise access_denied_exception
@api.model
def auth_oauth(self, provider, params):
# Advice by Google (to avoid Confused Deputy Problem)
# if validation.audience != OUR_CLIENT_ID:
# abort()
# else:
# continue with the process
access_token = params.get('access_token')
validation = self._auth_oauth_validate(provider, access_token)
# retrieve and sign in user
login = self._auth_oauth_signin(provider, validation, params)
if not login:
raise AccessDenied()
# return user credentials
return (self.env.cr.dbname, login, access_token)
def _check_credentials(self, credential, env):
try:
return super()._check_credentials(credential, env)
except AccessDenied:
if not (credential['type'] == 'oauth_token' and credential['token']):
raise
passwd_allowed = env['interactive'] or not self.env.user._rpc_api_keys_only()
if passwd_allowed and self.env.user.active:
res = self.sudo().search([('id', '=', self.env.uid), ('oauth_access_token', '=', credential['token'])])
if res:
return {
'uid': self.env.user.id,
'auth_method': 'oauth',
'mfa': 'default',
}
raise
def _get_session_token_fields(self):
return super(ResUsers, self)._get_session_token_fields() | {'oauth_access_token'}