# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import json import requests from werkzeug import http, datastructures if hasattr(datastructures.WWWAuthenticate, "from_header"): parse_auth = datastructures.WWWAuthenticate.from_header else: parse_auth = http.parse_www_authenticate_header from odoo import api, fields, models from odoo.exceptions import AccessDenied, UserError from odoo.addons.auth_signup.models.res_users import SignupError from odoo.addons import base base.models.res_users.USER_PRIVATE_FIELDS.append('oauth_access_token') class ResUsers(models.Model): _inherit = 'res.users' oauth_provider_id = fields.Many2one('auth.oauth.provider', string='OAuth Provider') oauth_uid = fields.Char(string='OAuth User ID', help="Oauth Provider user_id", copy=False) oauth_access_token = fields.Char(string='OAuth Access Token', readonly=True, copy=False, prefetch=False) _sql_constraints = [ ('uniq_users_oauth_provider_oauth_uid', 'unique(oauth_provider_id, oauth_uid)', 'OAuth UID must be unique per provider'), ] def _auth_oauth_rpc(self, endpoint, access_token): if self.env['ir.config_parameter'].sudo().get_param('auth_oauth.authorization_header'): response = requests.get(endpoint, headers={'Authorization': 'Bearer %s' % access_token}, timeout=10) else: response = requests.get(endpoint, params={'access_token': access_token}, timeout=10) if response.ok: # nb: could be a successful failure return response.json() auth_challenge = parse_auth(response.headers.get("WWW-Authenticate")) if auth_challenge and auth_challenge.type == 'bearer' and 'error' in auth_challenge: return dict(auth_challenge) return {'error': 'invalid_request'} @api.model def _auth_oauth_validate(self, provider, access_token): """ return the validation data corresponding to the access token """ oauth_provider = self.env['auth.oauth.provider'].browse(provider) validation = self._auth_oauth_rpc(oauth_provider.validation_endpoint, access_token) if validation.get("error"): raise Exception(validation['error']) if oauth_provider.data_endpoint: data = self._auth_oauth_rpc(oauth_provider.data_endpoint, access_token) validation.update(data) # unify subject key, pop all possible and get most sensible. When this # is reworked, BC should be dropped and only the `sub` key should be # used (here, in _generate_signup_values, and in _auth_oauth_signin) subject = next(filter(None, [ validation.pop(key, None) for key in [ 'sub', # standard 'id', # google v1 userinfo, facebook opengraph 'user_id', # google tokeninfo, odoo (tokeninfo) ] ]), None) if not subject: raise AccessDenied('Missing subject identity') validation['user_id'] = subject return validation @api.model def _generate_signup_values(self, provider, validation, params): oauth_uid = validation['user_id'] email = validation.get('email', 'provider_%s_user_%s' % (provider, oauth_uid)) name = validation.get('name', email) return { 'name': name, 'login': email, 'email': email, 'oauth_provider_id': provider, 'oauth_uid': oauth_uid, 'oauth_access_token': params['access_token'], 'active': True, } @api.model def _auth_oauth_signin(self, provider, validation, params): """ retrieve and sign in the user corresponding to provider and validated access token :param provider: oauth provider id (int) :param validation: result of validation of access token (dict) :param params: oauth parameters (dict) :return: user login (str) :raise: AccessDenied if signin failed This method can be overridden to add alternative signin methods. """ oauth_uid = validation['user_id'] try: oauth_user = self.search([("oauth_uid", "=", oauth_uid), ('oauth_provider_id', '=', provider)]) if not oauth_user: raise AccessDenied() assert len(oauth_user) == 1 oauth_user.write({'oauth_access_token': params['access_token']}) return oauth_user.login except AccessDenied as access_denied_exception: if self.env.context.get('no_user_creation'): return None state = json.loads(params['state']) token = state.get('t') values = self._generate_signup_values(provider, validation, params) try: login, _ = self.signup(values, token) return login except (SignupError, UserError): raise access_denied_exception @api.model def auth_oauth(self, provider, params): # Advice by Google (to avoid Confused Deputy Problem) # if validation.audience != OUR_CLIENT_ID: # abort() # else: # continue with the process access_token = params.get('access_token') validation = self._auth_oauth_validate(provider, access_token) # retrieve and sign in user login = self._auth_oauth_signin(provider, validation, params) if not login: raise AccessDenied() # return user credentials return (self.env.cr.dbname, login, access_token) def _check_credentials(self, credential, env): try: return super()._check_credentials(credential, env) except AccessDenied: if not (credential['type'] == 'oauth_token' and credential['token']): raise passwd_allowed = env['interactive'] or not self.env.user._rpc_api_keys_only() if passwd_allowed and self.env.user.active: res = self.sudo().search([('id', '=', self.env.uid), ('oauth_access_token', '=', credential['token'])]) if res: return { 'uid': self.env.user.id, 'auth_method': 'oauth', 'mfa': 'default', } raise def _get_session_token_fields(self): return super(ResUsers, self)._get_session_token_fields() | {'oauth_access_token'}