diff --git a/runbot/controllers/__init__.py b/runbot/controllers/__init__.py index 96d149ab..a3adbc61 100644 --- a/runbot/controllers/__init__.py +++ b/runbot/controllers/__init__.py @@ -3,3 +3,4 @@ from . import frontend from . import hook from . import badge +from . import public_api diff --git a/runbot/controllers/public_api.py b/runbot/controllers/public_api.py new file mode 100644 index 00000000..ff6f61ab --- /dev/null +++ b/runbot/controllers/public_api.py @@ -0,0 +1,87 @@ +import json + +from werkzeug.exceptions import BadRequest, Forbidden + +from odoo.exceptions import AccessError +from odoo.http import Controller, request, route +from odoo.tools import mute_logger + +from odoo.addons.runbot.models.public_model_mixin import PublicModelMixin + + +class PublicApi(Controller): + + @mute_logger('odoo.addons.base.models.ir_model') # We don't care about logging acl errors + def _get_model(self, model: str) -> PublicModelMixin: + """ + Returns the model from a model string. + + Raises the appropriate exception if: + - The model does not exist + - The model is not a public model + - The current user can not read the model + """ + pool = request.env.registry + try: + Model = pool[model] + except KeyError: + raise BadRequest('Unknown model') + if not issubclass(Model, pool['runbot.public.model.mixin']): + raise BadRequest('Unknown model') + Model = request.env[model] + Model.check_access('read') + if not Model._api_request_allow_direct_access(): + raise Forbidden('This model does not allow direct access') + return Model + + @route('/runbot/api/models', auth='public', methods=['GET'], readonly=True) + def models(self): + models = [] + for model in request.env.keys(): + try: + models.append(self._get_model(model)) + except (BadRequest, AccessError, Forbidden): + pass + return request.make_json_response( + [Model._name for Model in models] + ) + + @route('/runbot/api//read', auth='public', methods=['POST'], readonly=True, csrf=False) + def read(self, *, model: str): + Model = self._get_model(model) + required_keys = Model._api_request_required_keys() + allowed_keys = Model._api_request_allowed_keys() + try: + data = request.get_json_data() + except json.JSONDecodeError: + raise BadRequest('Invalid payload, missing or malformed json') + if not isinstance(data, dict): + raise BadRequest('Invalid payload, should be a dict.') + if (missing_keys := required_keys - set(data.keys())): + raise BadRequest(f'Invalid payload, missing keys: {", ".join(missing_keys)}') + if (unknown_keys := set(data.keys()) - allowed_keys): + raise BadRequest(f'Invalid payload, unknown keys: {", ".join(unknown_keys)}') + if Model._api_request_requires_project(): + if not isinstance(data['project_id'], int): + raise BadRequest('Invalid project_id, should be an int') + # This is an additional layer of protection for project_id + project = request.env['runbot.project'].browse(data['project_id']).exists() + if not project: + raise BadRequest('Unknown project_id') + project.check_access('read') + Model = Model.with_context(project_id=project.id) + return request.make_json_response(Model._api_request_read(data)) + + @route('/runbot/api//spec', auth='public', methods=['GET'], readonly=True) + def spec(self, *, model: str): + Model = self._get_model(model) + required_keys = Model._api_request_required_keys() + allowed_keys = Model._api_request_allowed_keys() + return request.make_json_response({ + 'requires_project': Model._api_request_requires_project(), + 'default_page_size': Model._api_request_default_limit(), + 'max_page_size': Model._api_request_max_limit(), + 'required_keys': list(Model._api_request_required_keys()), + 'allowed_keys': list(allowed_keys - required_keys), + 'specification': self._get_model(model)._api_public_specification(), + }) diff --git a/runbot/models/__init__.py b/runbot/models/__init__.py index dbd376be..c755616b 100644 --- a/runbot/models/__init__.py +++ b/runbot/models/__init__.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- +from . import public_model_mixin + from . import batch from . import branch from . import build diff --git a/runbot/models/public_model_mixin.py b/runbot/models/public_model_mixin.py new file mode 100644 index 00000000..75c1e628 --- /dev/null +++ b/runbot/models/public_model_mixin.py @@ -0,0 +1,315 @@ +from __future__ import annotations + +from werkzeug.exceptions import BadRequest, Forbidden + +from typing import Dict, Union, List, Self, TypedDict + +from odoo import models, api, fields, tools +from odoo.osv import expression + + +class SubSpecification(TypedDict): + context: Dict + fields: Dict[str, 'SubSpecification'] +Specification = Dict[str, Union[Dict, 'SubSpecification']] + +SUPPORTED_FIELD_TYPES = { # Perhaps this should be a list of class instead + 'boolean', 'integer', 'float', 'char', 'text', 'html', + 'date', 'datetime', 'selection', 'jsonb', + 'many2one', 'one2many', 'many2many', +} +RELATIONAL_FIELD_TYPES = {'many2one', 'one2many', 'many2many'} +SPEC_MAX_DEPTH = 10 +SPEC_METADATA_FIELD = { + '__type', '__help', +} +DEFAULT_LIMIT = 20 +DEFAULT_MAX_LIMIT = 60 + +def _cleaned_spec(spec: Specification | SubSpecification) -> Specification | SubSpecification: + """ Returns the specification without metadata fields. """ + if not isinstance(spec, dict): + return spec + return { + k: v for k, v in spec.items() + if k not in SPEC_METADATA_FIELD + } + +class PublicModelMixin(models.AbstractModel): + _name = 'runbot.public.model.mixin' + _description = 'Mixin for publicly accessible data' + + @api.model + def _valid_field_parameter(self, field: fields.Field, name: str): + if field.type in SUPPORTED_FIELD_TYPES: + return name in ( + # boolean, whether the field is readable through the public api, + # public fields on record on which the user does not have access are not exposed. + 'public', + ) or super()._valid_field_parameter(field, name) + return super()._valid_field_parameter(field, name) + + @api.model + def _get_public_fields(self) -> List[fields.Field]: + """ Returns a list of publicly readable fields. """ + return [ + field for field in self._fields.values() + if getattr(field, 'public', None) or field.name == 'id' + ] + + ########## REQUESTS ########## + + @api.model + def _api_request_allow_direct_access(self) -> bool: + """ Returns whether this model is accessible directly through the api. """ + return True + + @api.model + def _api_request_allowed_keys(self) -> set[str]: + """ Returns a list of allowed keys for request_data. """ + return self._api_request_required_keys() | { + 'context', + 'limit', 'offset', + } + + @api.model + def _api_request_default_limit(self) -> int: + return DEFAULT_LIMIT + + @api.model + def _api_request_max_limit(self) -> int: + return DEFAULT_MAX_LIMIT + + @api.model + def _api_request_required_keys(self) -> set[str]: + """ Returns a list of required keys for request_data. """ + required_keys = {'specification', 'domain'} + if self._api_request_requires_project(): + required_keys.add('project_id') + return required_keys + + @api.model + def _api_request_requires_project(self) -> bool: #TODO: rename me + """ Public models are by default based on a project_id (filtered on project_id). """ + return self._api_request_allow_direct_access() + + @api.model + def _api_project_id_field_path(self) -> str: + """ Returns the path from the current object to project_id. """ + raise NotImplementedError('_api_project_id_field_path not implemented') + + @api.model + def _api_request_validate_domain(self, domain: list[str | tuple | list]): + """ + Validates a domain against the public spec. + + This only validates that all the fields in the domain are queryable fields, + the actual validity of the domain will be checked by the orm when + searching for records. + + Returns: + domain: a transformed domain if necessary + + Raises: + AssertionError: unknown domain leaf + Forbidden: invalid field used + """ + + try: + self._where_calc(domain) + except ValueError as e: + raise BadRequest('Invalid domain') from e + + spec: Specification = self._api_public_specification() + # recompiles the spec into a list of fields that can be present in the domain + valid_fields: str[str] = set() + def _visit_spec(spec, prefix: str | None = None): + spec = _cleaned_spec(spec) + if not spec: + return + for field, sub_spec in spec.items(): + this_field = f'{prefix}.{field}' if prefix else field + valid_fields.add(this_field) + if sub_spec and sub_spec.get('fields'): + _visit_spec(sub_spec['fields'], prefix=this_field) + _visit_spec(spec) + + for leaf in domain: + if not isinstance(leaf, (tuple, list)): + continue + assert len(leaf) == 3 # Can this happen in a valid domain? + if leaf[0] not in valid_fields and not self.env.user.has_group('runbot.group_runbot_admin'): + raise Forbidden('Trying to filter from private field') + + if self._api_request_requires_project(): + assert 'project_id' in self.env.context + domain = expression.AND([ + [(self._api_project_id_field_path(), '=', self.env.context['project_id'])], + domain + ]) + + return domain + + @api.model + def _api_request_read_get_offset_limit(self, request_data: dict) -> tuple[int, int]: + if 'limit' in request_data: + if not isinstance(request_data['limit'], int): + raise BadRequest('Invalid page size (should be int)') + limit = request_data['limit'] + if limit > self._api_request_max_limit(): + raise BadRequest('Page size exceeds max size') + else: + limit = self._api_request_default_limit() + offset = 0 + if 'offset' in request_data: + if not isinstance(request_data['offset'], int): + raise BadRequest('Invalid page (should be int)') + offset = request_data['offset'] + return limit, offset + + @api.model + def _api_request_read_get_records(self, request_data: dict) -> Self: + limit, offset = self._api_request_read_get_offset_limit(request_data) + return self.search(request_data['domain'], limit=limit, offset=offset) + + @api.model + def _api_request_read(self, request_data: dict) -> list[dict]: + """ + Processes a frontend request and returns the data to be returned by the controller. + + This method is allowed to raise Http specific exceptions. + """ + specification, domain = request_data['specification'], request_data['domain'] + + try: + if not self._api_verify_specification(specification) and\ + not self.env.user.has_group('runbot.group_runbot_admin'): + raise Forbidden('Invalid specification or trying to access private data.') + except (ValueError, AssertionError) as e: + raise BadRequest('Invalid specification') from e + + request_data['domain'] = self._api_request_validate_domain(domain) + records = self._api_request_read_get_records(request_data) + + return records._api_read(request_data['specification']) + + ########## SPEC ########## + + @api.model + def _api_get_relation_field_key(self, field: fields.Field): + """ Returns a relation cache key for a field, a string defining the identity of the relationship. """ + if isinstance(field, fields.Many2one): + return f'{self._name}__{field.name}' + elif isinstance(field, fields.Many2many): + if not field.store: + return f'{self._name}__{field.name}' + return field.relation + elif isinstance(field, fields.One2many): + if not field.store: # is this valid? + return f'{self._name}__{field.name}' + CoModel: PublicModelMixin = self.env[field.comodel_name] + inverse_field = CoModel._fields[field.inverse_name] + return CoModel._api_get_relation_field_key(inverse_field) + raise NotImplementedError('Unsupported field') + + @tools.ormcache() + @api.model + def _api_public_specification(self) -> Specification: + """ + Returns the public specification for the model. + + The specification will go through all the fields marked as public. + For relational fields, the result will be nested (up to a depth of :code:`SPEC_MAX_DEPTH`). + + The specification will contain metadata about each fields. + The specification returned by this method can be used directly with :code:`_api_read`. + + Returns: + specification: The specification as a dictionary. + """ + # We want to prevent infinite loops so we need to track which relations + # have already been explored, this concerns many2one, many2many + def _visit_model(model: PublicModelMixin, visited_relations: set[str], depth = 0) -> Specification | SubSpecification: + spec: Specification | SubSpecification = {} + for field in model._get_public_fields(): + field_metadata = { + '__type': field.type, + } + if field.help: + field_metadata['__help'] = field.help + if field.relational and \ + issubclass(self.pool[field.comodel_name], PublicModelMixin): + field_key = model._api_get_relation_field_key(field) + if field_key in visited_relations or depth == SPEC_MAX_DEPTH: + continue + visited_relations.add(field_key) + CoModel: PublicModelMixin = model.env[field.comodel_name] + field_metadata.update( + fields=_visit_model(CoModel, {*visited_relations}, depth + 1) + ) + spec[field.name] = field_metadata + return spec + + return _visit_model(self, set()) + + @api.model + def _api_verify_specification(self, specification: Specification) -> bool: + """ + Verifies a given specification against the public specification. + + This step also provides some validation of the specification, enough that + the spec can be safely used with `_api_read` if the method does not + raise an exception. + + Args: + specification: The requested specification. + + Returns: + If the spec matches the public spec this method returns True + otherwise False. + + Raises: + ValueError: If a sub spec is given for a non relational field. + ValueError: If a sub spec is given for a relational field that does + not allow public data (id only). + """ + public_specification: Specification = self._api_public_specification() + + def _visit_spec( + model_spec: Specification, + request_spec: Specification, + ) -> bool: + request_spec = _cleaned_spec(request_spec) + for field, sub_spec in request_spec.items(): + sub_spec = _cleaned_spec(sub_spec) + if field not in model_spec: + return False + if not isinstance(sub_spec, dict): + raise ValueError( + 'Invalid sub spec, should be a dict.' + ) + # For now we actually only have keys for relational fields. + sub_spec_allowed_keys = set() + if model_spec[field].get('__type') in RELATIONAL_FIELD_TYPES\ + and 'fields' in model_spec[field]: + sub_spec_allowed_keys.add('fields') + sub_spec_allowed_keys.add('context') + if set(sub_spec.keys()) - sub_spec_allowed_keys: + raise ValueError( + 'Invalid sub spec, contains unknown keys.' + ) + if not sub_spec or 'fields' not in sub_spec: + continue + if 'fields' not in model_spec[field]: + raise ValueError( + f'Sub spec not available for field {field}' + ) + if not _visit_spec(model_spec[field]['fields'], sub_spec['fields']): + return False + return True + + return _visit_spec(public_specification, specification) + + def _api_read(self, specification: Specification) -> list[dict]: + """ Forwards the specification to `web_read`. """ + return self.web_read(specification) diff --git a/runbot/tests/__init__.py b/runbot/tests/__init__.py index e4410f29..85874773 100644 --- a/runbot/tests/__init__.py +++ b/runbot/tests/__init__.py @@ -16,3 +16,4 @@ from . import test_commit from . import test_upgrade from . import test_dockerfile from . import test_host +from . import test_public_api diff --git a/runbot/tests/test_public_api.py b/runbot/tests/test_public_api.py new file mode 100644 index 00000000..19293d1b --- /dev/null +++ b/runbot/tests/test_public_api.py @@ -0,0 +1,260 @@ +import json + +from werkzeug.exceptions import BadRequest, Forbidden + +from odoo.osv import expression +from odoo.tests.common import HttpCase, TransactionCase, tagged, new_test_user + +from odoo.addons.runbot.models.public_model_mixin import PublicModelMixin + + +@tagged('-at_install', 'post_install') +class TestPublicApi(HttpCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.project = cls.env['runbot.project'].create({'name': 'Tests', 'process_delay': 0}) + + def get_public_models(self): + for Model in self.registry.values(): + if not issubclass(Model, PublicModelMixin) or Model._name == 'runbot.public.model.mixin': + continue + yield self.env[Model._name] + + + def test_requires_project_defines_project_id_path(self): + for Model in self.get_public_models(): + if not Model._api_request_requires_project(): + continue + # Try calling _api_project_id_field_path, none should fail + with self.subTest(model=Model._name): + try: + Model._api_project_id_field_path() + except NotImplementedError: + self.fail('_api_project_id_field_path not implemented') + + def test_direct_access_disabled(self): + DisabledModel = self.env['runbot.commit.link'] + self.assertFalse(DisabledModel._api_request_allow_direct_access()) + + resp = self.url_open('/runbot/api/models') + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertNotIn(DisabledModel._name, data) + + resp = self.url_open(f'/runbot/api/{DisabledModel._name}/spec') + self.assertEqual(resp.status_code, 403) + + resp = self.url_open(f'/runbot/api/{DisabledModel._name}/read', data="{}", headers={'Content-Type': 'application/json'}) # model checking happens before data checking + self.assertEqual(resp.status_code, 403) + + def test_api_public_basics(self): + # This serves as a basic read test through the api + Model = self.env['runbot.bundle'] + self.assertTrue(Model._api_request_allow_direct_access()) + + resp = self.url_open('/runbot/api/models') + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertIn(Model._name, data) + + resp = self.url_open(f'/runbot/api/{Model._name}/spec') + self.assertEqual(resp.status_code, 200) + + request_data = json.dumps({ + 'domain': [], + 'specification': resp.json()['specification'], + 'project_id': self.project.id, + }) + resp = self.url_open(f'/runbot/api/{Model._name}/read', data=request_data, headers={'Content-Type': 'application/json'}) + self.assertEqual(resp.status_code, 200) + + def test_api_read_from_spec_public_models(self): + # This is not ideal as we don't have any data but it is better than nothing + for Model in self.get_public_models(): + if not Model._api_request_allow_direct_access(): + continue + with self.subTest(model=Model._name): + resp = self.url_open(f'/runbot/api/{Model._name}/spec') + self.assertEqual(resp.status_code, 200) + data = resp.json() + if set(data['required_keys']) > self.env['runbot.public.model.mixin']._api_request_required_keys(): + self.skipTest('Skipping, request requires unknown keys, create a specific test') + request_data = { + 'domain': [], + 'specification': data['specification'], + } + if Model._api_request_requires_project(): + request_data['project_id'] = self.project.id + request_data = json.dumps(request_data) + resp = self.url_open(f'/runbot/api/{Model._name}/read', data=request_data, headers={'Content-Type': 'application/json'}) + self.assertEqual(resp.status_code, 200) + + def test_api_read_homepage(self): + # Arbitrary test testing the initial schema required for the homepage + # We only check that the response is successful + request_data = json.dumps({ + 'domain': [['last_batch', '!=', False]], + 'project_id': self.project.id, + # 'category_id': False Ignored for the sake of the test + 'specification': { + "name": {}, + "branch_ids": { + "fields": { + "dname": {}, + "branch_url": {} + } + }, + "last_batchs": { + "fields": { + "age": {}, + "last_update": {}, + "slot_ids": { + "fields": { + "link_type": {}, + "trigger_id": { + "fields": { + "name": {} + } + }, + "build_id": { + "fields": { + "local_state": {}, + "local_result": {}, + "global_state": {}, + "global_result": {}, + "requested_action": {}, + "log_list": {}, + "version_id": {}, + "config_id": {}, + "trigger_id": {}, + "create_batch_id": {}, + "host_id": { + "fields": { + "name": {} + } + }, + "database_ids": { + "fields": { + "name": {} + } + } + } + } + } + }, + "commit_link_ids": { + "fields": { + "match_type": {}, + "commit_id": { + "fields": { + "dname": {}, + "subject": {} + } + } + } + } + } + } + } + }) + resp = self.url_open('/runbot/api/runbot.bundle/read', data=request_data, headers={'Content-Type': 'application/json'}) + resp.raise_for_status() + +@tagged('-at_install', 'post_install') +class TestPublicModelApi(TransactionCase): + + def setUp(self): + super().setUp() + self.project = self.env['runbot.project'].create({'name': 'Tests', 'process_delay': 0}) + self.basic_user = new_test_user(self.env, 'runbot') + self.uid = self.basic_user + # Context key used in some tests. + self.BundleModel = self.env['runbot.bundle']\ + .with_context(project_id=self.project.id)\ + .with_user(self.basic_user) + + def test_invalid_domain(self): + # Unknown field + with self.assertRaises(BadRequest): + self.BundleModel._api_request_validate_domain([['booger', '=', 1]]) + + # Private field + self.assertFalse( + getattr(self.BundleModel._fields['modules'], 'public', False), + 'modules field is not private anymore, change to another private field', + ) + with self.assertRaises(Forbidden): + self.BundleModel._api_request_validate_domain( + [('modules', '=', 1)] + ) + + def test_valid_domain_add_project_id(self): + self.assertTrue(self.BundleModel._api_request_requires_project()) + + self.assertEqual( + self.BundleModel._api_request_validate_domain([]), + [('project_id', '=', self.project.id)] + ) + + def test_valid_domain(self): + domain = [ + ('name', '=', 'master'), # Basic field + ('project_id.name', '=', 'R&D'), # 1-level related field + ('project_id.trigger_ids.name', '=', 'Enterprise run'), # 2-level related field + ] + self.assertListEqual( + self.BundleModel._api_request_validate_domain(domain), + expression.AND([ + [('project_id', '=', self.project.id)], + domain, + ]) + ) + + def test_process_read_limits(self): + request_data = { + 'domain': [], + 'specification': {}, + } + # Test with non int limit + with self.assertRaises(BadRequest): + request_data['limit'] = 'test' + self.BundleModel._api_request_read(request_data) + # Test with limit above max + with self.assertRaises(BadRequest): + request_data['limit'] = self.BundleModel._api_request_max_limit() + 10 + self.BundleModel._api_request_read(request_data) + request_data.pop('limit') + # Test with invalid offset + with self.assertRaises(BadRequest): + request_data['offset'] = 'test' + self.BundleModel._api_request_read(request_data) + + def test_verify_spec_invalid(self): + check = self.BundleModel._api_verify_specification + # Test with unknown field + self.assertFalse( + check({ + 'invalid_field': {} + }) + ) + self.assertFalse( + check({ + 'project_id': { + 'fields': { + 'invalid_field': {} + } + } + }) + ) + # Test with sub_spec not dict + with self.assertRaises(ValueError): + check({ + 'name': ['i', 'don\'t', 'know'] + }) + # Test with unknown key in dict + with self.assertRaises(ValueError): + check({ + 'name': {'fields': {}} # Non relational fields do not allow 'fields' + }) diff --git a/runbot_test/__init__.py b/runbot_test/__init__.py new file mode 100644 index 00000000..253f5724 --- /dev/null +++ b/runbot_test/__init__.py @@ -0,0 +1 @@ +from . import models as models diff --git a/runbot_test/__manifest__.py b/runbot_test/__manifest__.py new file mode 100644 index 00000000..9d4c9a71 --- /dev/null +++ b/runbot_test/__manifest__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +{ + 'name': "runbot test", + 'summary': "Runbot test", + 'description': "Runbot test", + 'author': "Odoo SA", + 'website': "http://runbot.odoo.com", + 'category': 'Website', + 'version': '1.0', + 'depends': ['runbot'], + 'license': 'LGPL-3', + 'data': [ + 'security/ir.model.access.csv', + ], +} diff --git a/runbot_test/models.py b/runbot_test/models.py new file mode 100644 index 00000000..bebcbdb4 --- /dev/null +++ b/runbot_test/models.py @@ -0,0 +1,70 @@ +from odoo import models, api, fields +from odoo.addons.runbot.fields import JsonDictField + + +class TestModelParent(models.Model): + _name = 'runbot.test.model.parent' + _inherit = ['runbot.public.model.mixin'] + _description = 'parent' + + private_field = fields.Boolean(public=False) + + field_bool = fields.Boolean(public=True) + field_integer = fields.Integer(public=True) + field_float = fields.Float(public=True) + field_char = fields.Char(public=True) + field_text = fields.Text(public=True) + field_html = fields.Html(public=True) + field_date = fields.Date(public=True) + field_datetime = fields.Datetime(public=True) + field_selection = fields.Selection(selection=[('a', 'foo'), ('b', 'bar')], public=True) + field_json = JsonDictField(public=True) + + field_many2one = fields.Many2one('runbot.test.model.parent', public=True) + field_one2many = fields.One2many('runbot.test.model.child', 'parent_id', public=True) + field_one2many_private = fields.One2many('runbot.test.model.child.private', 'parent_id', public=True) + field_many2many = fields.Many2many( + 'runbot.test.model.parent', relation='test_model_relation', public=True, + column1='col_1', column2='col_2', + ) + + field_one2many_computed = fields.One2many('runbot.test.model.child', compute='_compute_one2many', public=True) + field_many2many_computed = fields.Many2many('runbot.test.model.parent', compute='_compute_many2many', public=True) + + @api.model + def _api_request_allow_direct_access(self): + return False + + @api.depends() + def _compute_one2many(self): + self.field_one2many_computed = self.env['runbot.test.model.child'].search([]) + + @api.depends() + def _compute_many2many(self): + for rec in self: + rec.field_many2many_computed = self.env['runbot.test.model.parent'].search([ + ('id', '!=', rec.id), + ]) + + +class TestModelChild(models.Model): + _name = 'runbot.test.model.child' + _inherit = ['runbot.public.model.mixin'] + _description = 'child' + + parent_id = fields.Many2one('runbot.test.model.parent', required=True, public=True) + + data = fields.Integer(public=True) + + @api.model + def _api_request_allow_direct_access(self): + return False + + +class TestPrivateModelChild(models.Model): + _name = 'runbot.test.model.child.private' + _description = 'Private Child' + + parent_id = fields.Many2one('runbot.test.model.parent', required=True) + + data = fields.Integer() diff --git a/runbot_test/security/ir.model.access.csv b/runbot_test/security/ir.model.access.csv new file mode 100644 index 00000000..c5df93c3 --- /dev/null +++ b/runbot_test/security/ir.model.access.csv @@ -0,0 +1,8 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +runbot_test.access_runbot_test_model_parent,access_runbot_test_model_parent,runbot_test.model_runbot_test_model_parent,base.group_user,1,0,0,0 +runbot_test.access_runbot_test_model_child,access_runbot_test_model_child,runbot_test.model_runbot_test_model_child,base.group_user,1,0,0,0 +runbot_test.access_runbot_test_model_child_private,access_runbot_test_model_child_private,runbot_test.model_runbot_test_model_child_private,base.group_user,1,0,0,0 + +runbot_test.access_runbot_test_model_parent_admin,access_runbot_test_model_parent_admin,runbot_test.model_runbot_test_model_parent,runbot.group_runbot_admin,1,1,1,1 +runbot_test.access_runbot_test_model_child_admin,access_runbot_test_model_child_admin,runbot_test.model_runbot_test_model_child,runbot.group_runbot_admin,1,1,1,1 +runbot_test.access_runbot_test_model_child_admin_private,access_runbot_test_model_child_admin_private,runbot_test.model_runbot_test_model_child_private,runbot.group_runbot_admin,1,1,1,1 diff --git a/runbot_test/tests/__init__.py b/runbot_test/tests/__init__.py new file mode 100644 index 00000000..8a0a9b3b --- /dev/null +++ b/runbot_test/tests/__init__.py @@ -0,0 +1 @@ +from . import test_spec as test_spec diff --git a/runbot_test/tests/test_spec.py b/runbot_test/tests/test_spec.py new file mode 100644 index 00000000..ff82192e --- /dev/null +++ b/runbot_test/tests/test_spec.py @@ -0,0 +1,295 @@ +from odoo import fields +from odoo.tests.common import TransactionCase, tagged, new_test_user + + +@tagged('-at_install', 'post_install') +class TestSpec(TransactionCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.Parent = cls.env['runbot.test.model.parent'] + cls.Child = cls.env['runbot.test.model.child'] + cls.user_basic = new_test_user( + cls.env, 'basic', + ) + + def test_parent_spec(self): + spec = self.Parent._api_public_specification() + self.assertNotIn('private_field', spec) + + self.assertEqual( + spec['field_bool'], + { + '__type': 'boolean', + } + ) + + self.assertIn('field_many2one', spec) + sub_spec = spec['field_many2one']['fields'] + self.assertIn('field_many2many', spec) + self.assertIn('field_many2many', sub_spec) + + self.assertTrue(self.Parent._api_verify_specification(spec)) + + def test_child_spec(self): + spec = self.Child._api_public_specification() + + self.assertIn('parent_id', spec) + self.assertIn('data', spec) + + sub_spec = spec['parent_id'] + # The reverse relation should not be part of the sub spec, as we already + # traversed that relation + self.assertNotIn('field_one2many', sub_spec) + + self.assertTrue(self.Child._api_verify_specification(spec)) + + def test_parent_read_basic(self): + today = fields.Date.today() + now = fields.Datetime.now() + parent = self.Parent.create({ + 'field_bool': True, + 'field_date': today, + 'field_datetime': now, + 'field_json': {'test': 1} + }) + self.assertEqual( + parent._api_read({'field_bool': {}, 'field_date': {}, 'field_datetime': {}, 'field_json': {}}), + [{ + 'id': parent.id, + 'field_bool': True, + 'field_date': today, + 'field_datetime': now, + 'field_json': {'test': 1} + }] + ) + + def test_parent_read_m2o(self): + parent_1 = self.Parent.create({'field_integer': 1}) + parent_2 = self.Parent.create({'field_integer': 2, 'field_many2one': parent_1.id}) + + # Read without sub spec + self.assertEqual( + parent_1._api_read({'field_many2one': {}}), + [{'id': parent_1.id, 'field_many2one': False}] + ) + self.assertEqual( + parent_2._api_read({'field_many2one': {}}), + [{'id': parent_2.id, 'field_many2one': parent_1.id}] + ) + # Read with sub spec + self.assertEqual( + parent_1._api_read({'field_many2one': {'fields': {'field_integer': {}}}}), + [{'id': parent_1.id, 'field_many2one': False}] + ) + self.assertEqual( + parent_2._api_read({'field_many2one': {'fields': {'field_integer': {}}}}), + [{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_integer': parent_1.field_integer}}] + ) + both = parent_1 | parent_2 + # Read both with sub spec + self.assertEqual( + both._api_read({'field_integer': {}, 'field_many2one': {'fields': {'field_integer': {}}}}), + [ + {'id': parent_1.id, 'field_integer': parent_1.field_integer, 'field_many2one': False}, + {'id': parent_2.id, 'field_integer': parent_2.field_integer, 'field_many2one': { + 'id': parent_1.id, 'field_integer': parent_1.field_integer, + }}, + ] + ) + + def test_parent_read_one2many(self): + parent = self.Parent.create({ + 'field_float': 13.37, + 'field_one2many': [ + (0, 0, {'data': 1}), + (0, 0, {'data': 2}), + (0, 0, {'data': 3}), + ] + }) + parent_no_o2m = self.Parent.create({ + 'field_float': 13.37, + }) + + # Basic read + self.assertEqual( + parent._api_read({ + 'field_float': {}, + 'field_one2many': { + 'fields': { + 'data': {} + }, + } + }), + [{ + 'id': parent.id, + 'field_float': 13.37, + 'field_one2many': [ + {'id': parent.field_one2many[0].id, 'data': 1}, + {'id': parent.field_one2many[1].id, 'data': 2}, + {'id': parent.field_one2many[2].id, 'data': 3}, + ] + }] + ) + self.assertEqual( + parent_no_o2m._api_read({ + 'field_float': {}, + 'field_one2many': { + 'fields': { + 'data': {}, + } + }, + }), + [{ + 'id': parent_no_o2m.id, + 'field_float': 13.37, + 'field_one2many': [], + }] + ) + + # Reading parent_id through field_one2many_computed is allowed since relationship is not known + self.env.invalidate_all() + spec = { + 'field_float': {}, + 'field_one2many': { + 'fields': { + 'data': {}, + 'parent_id': { + 'fields': { + 'field_float': {}, + } + }, + } + }, + } + self.assertEqual( + parent._api_read(spec), + [{ + 'id': parent.id, + 'field_float': 13.37, + 'field_one2many': [ + {'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + ] + }] + ) + # It should not work with basic user + self.assertFalse(parent.with_user(self.user_basic)._api_verify_specification(spec)) + # It should however work with the computed version + spec['field_one2many_computed'] = spec['field_one2many'] + spec.pop('field_one2many') + self.assertEqual( + parent.with_user(self.user_basic)._api_read(spec), + [{ + 'id': parent.id, + 'field_float': 13.37, + 'field_one2many_computed': [ + {'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + ] + }] + ) + + def test_parent_read_one2many_private(self): + # Check the private one2many (field is public, model is not), we can read id but nothing else + parent = self.Parent.create({ + 'field_float': 13.37, + 'field_one2many_private': [ + (0, 0, {'data': 1}), + ] + }).with_user(self.user_basic) + spec = parent._api_public_specification() + self.assertNotIn('fields', spec['field_one2many_private']) + result = parent._api_read(spec) + self.assertEqual( + result[0]['field_one2many_private'], + [parent.field_one2many_private.id], + ) + with self.assertRaises(ValueError): + parent._api_verify_specification({ + 'field_one2many_private': { + 'fields': { + 'data': {} + } + } + }) + + + def test_parent_read_m2m(self): + parent = self.Parent.create({ + 'field_integer': 1, + 'field_many2many': [ + (0, 0, {'field_integer': 2}), + ], + }) + other_parent = parent.field_many2many + self.assertEqual( + parent._api_read({'field_integer': {}, 'field_many2many': {}}), + [ + {'id': parent.id, 'field_many2many': list(parent.field_many2many.ids), 'field_integer': 1}, + ], + ) + self.env.invalidate_all() + spec = { + 'field_many2many_computed': { + 'fields': { + 'field_many2many_computed': { + 'fields': { + 'field_integer': {} + } + } + } + } + } + self.assertEqual( + parent._api_read(spec), + [ + {'id': parent.id, 'field_many2many_computed': [ + {'id': other_parent.id, 'field_many2many_computed': [ + {'id': parent.id, 'field_integer': 1} + ]} + ]} + ] + ) + self.assertFalse(parent.with_user(self.user_basic)._api_verify_specification(spec)) + + def test_parent_read_cyclic_parent(self): + parent_1 = self.Parent.create({ + 'field_integer': 1, + }) + parent_2 = self.Parent.create({ + 'field_integer': 2, + 'field_many2one': parent_1.id, + }) + parent_1.field_many2one = parent_2 + self.env.invalidate_all() + both = (parent_1 | parent_2) + self.assertEqual( + both._api_read({'field_many2one': {}}), + [ + {'id': parent_1.id, 'field_many2one': parent_2.id}, + {'id': parent_2.id, 'field_many2one': parent_1.id}, + ] + ) + self.assertEqual( + both.with_user(self.user_basic)._api_read({'field_many2one': {}}), + [ + {'id': parent_1.id, 'field_many2one': parent_2.id}, + {'id': parent_2.id, 'field_many2one': parent_1.id}, + ] + ) + spec = {'field_many2one': {'fields': {'field_many2one': {}}}} + self.assertEqual( + both._api_read(spec), + [ + {'id': parent_1.id, 'field_many2one': {'id': parent_2.id, 'field_many2one': parent_1.id}}, + {'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_many2one': parent_2.id}}, + ] + ) + self.assertFalse(both.with_user(self.user_basic)._api_verify_specification(spec)) + +