From f2e79007b9d1bfac72a0ed5df7cec00c93313519 Mon Sep 17 00:00:00 2001 From: William Braeckman Date: Thu, 13 Mar 2025 15:15:11 +0100 Subject: [PATCH] [IMP] runbot: implement public model mixin The mixin provides tooling for graphql-ish requests with request and response schemas. The goal is to safely expose fields through a public api where public users may request data for a given schema similar to how graphql works. --- runbot/models/__init__.py | 2 + runbot/models/public_model_mixin.py | 205 +++++++++++++++++ runbot_test/__init__.py | 1 + runbot_test/__manifest__.py | 15 ++ runbot_test/models.py | 52 +++++ runbot_test/security/ir.model.access.csv | 6 + runbot_test/tests/__init__.py | 1 + runbot_test/tests/test_schema.py | 277 +++++++++++++++++++++++ 8 files changed, 559 insertions(+) create mode 100644 runbot/models/public_model_mixin.py create mode 100644 runbot_test/__init__.py create mode 100644 runbot_test/__manifest__.py create mode 100644 runbot_test/models.py create mode 100644 runbot_test/security/ir.model.access.csv create mode 100644 runbot_test/tests/__init__.py create mode 100644 runbot_test/tests/test_schema.py diff --git a/runbot/models/__init__.py b/runbot/models/__init__.py index dbd376be..c755616b 100644 --- a/runbot/models/__init__.py +++ b/runbot/models/__init__.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- +from . import public_model_mixin + from . import batch from . import branch from . import build diff --git a/runbot/models/public_model_mixin.py b/runbot/models/public_model_mixin.py new file mode 100644 index 00000000..4e1d2c0f --- /dev/null +++ b/runbot/models/public_model_mixin.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +from typing import Dict, Union, List + +from odoo import models, api, fields, tools + + +ResponseSchema = Dict[str, Dict[str, Union[str, 'ResponseSchema']]] +RequestSchema = Dict[str, Union[None, 'RequestSchema']] +ModelData = Dict[str, Union[str, float, bool, int, 'ModelData']] +ResponseData = List[ModelData] + +SUPPORTED_FIELD_TYPES = { # Perhaps this should be a list of class instead + 'boolean', 'integer', 'float', 'char', 'text', 'html', + 'date', 'datetime', 'selection', 'jsonb', + 'many2one', 'one2many', 'many2many', +} +SCHEMA_MAX_DEPTH = 10 +SCHEMA_METADATA_FIELDS = {'__type', '__relational', '__help'} + +def _cleaned_schema(schema: RequestSchema) -> RequestSchema | None: + if schema: + schema = { + k: v for k, v in schema.items() + if k not in SCHEMA_METADATA_FIELDS + } + # Fallback to None if the schema is empty + return schema or None + +class PublicModelMixin(models.AbstractModel): + _name = 'runbot.public.model.mixin' + _description = 'Mixin for publicly accessible data' + + @api.model + def _valid_field_parameter(self, field: fields.Field, name: str): + if field.type in SUPPORTED_FIELD_TYPES: + return name in ( + 'public', # boolean, whether the field is readable through the public api + ) or super()._valid_field_parameter(field, name) + return super()._valid_field_parameter(field, name) + + @api.model + def _get_public_fields(self): + """ Returns a list of publicly readable fields. """ + return [ + field for field in self._fields.values() + if getattr(field, 'public', None) or field.name == 'id' + ] + + @api.model + def _get_relation_cache_key(self, field: fields.Field): + """ Returns a cache key for a field, this is used to prevent infinite loops.""" + if isinstance(field, fields.Many2one): + return f'{self._name}__{field.name}' + elif isinstance(field, fields.Many2many): + if not field.store: + return f'{self._name}__{field.name}' + return field.relation + elif isinstance(field, fields.One2many): + if not field.store: # is this valid? + return f'{self._name}__{field.name}' + CoModel: PublicModelMixin = self.env[field.comodel_name] + inverse_field = CoModel._fields[field.inverse_name] + return CoModel._get_relation_cache_key(inverse_field) + raise NotImplementedError('Unsupported field') + + @tools.ormcache() + @api.model + def _get_public_schema(self) -> RequestSchema: + """ Returns an auto-generated schema according to public fields. """ + # We want to prevent infinite loops so we need to track which relations + # have already been explored, this concerns many2one, many2many + def _visit_model(model: PublicModelMixin, visited_relations: set[str], depth = 0): + schema: RequestSchema = {} + for field in model._get_public_fields(): + field_metadata = { + '__type': field.type, + '__relational': field.relational, + } + if field.help: + field_metadata['__help'] = field.help + if field.relational and \ + PublicModelMixin._name in model.env[field.comodel_name]._inherit: + field_key = model._get_relation_cache_key(field) + if field_key in visited_relations or depth == SCHEMA_MAX_DEPTH: + continue + visited_relations.add(field_key) + CoModel: PublicModelMixin = model.env[field.comodel_name] + field_metadata.update(_visit_model(CoModel, {*visited_relations}, depth + 1)) + schema[field.name] = field_metadata + return schema + + return _visit_model(self, set()) + + @api.model + def _verify_schema(self, schema: RequestSchema) -> bool: + """ + Verifies a given schema against the public schema. + + This step also provides some validation of the schema, enough that the + schema can be safely used with `_read_schema` if the method does not + raise an exception. + + Args: + schema: The requested schema, it should be a dictionary with fields + as keys, and values should be None or a schema for a comodel if + applicable. + + Returns: + If the schema matches the public schema this method returns True + otherwise False. + + Raises: + ValueError: If a sub schema is given for a non relational field. + """ + public_schema: ResponseSchema = self._get_public_schema() + + def _visit_schema( + model_schema: ResponseSchema, + request_schema: RequestSchema, + ) -> bool: + for field, sub_schema in request_schema.items(): + sub_schema = _cleaned_schema(sub_schema) + if field in SCHEMA_METADATA_FIELDS: + continue + if field not in model_schema: + return False + if sub_schema is None or not sub_schema.get('__relational', False): + continue + if not model_schema[field].get('__relational'): + raise ValueError( + 'Invalid sub schema provided for non relational field' + ) + if not _visit_schema(model_schema[field], sub_schema): + return False + return True + + return _visit_schema(public_schema, schema) + + def _read_schema_read_field(self, field: fields.Field): + """ Adapts the type for a given field for schema reads. """ + self.ensure_one() + + value = self[field.name] + if value is False and field.type != 'boolean': + value = None + if field.type == 'jsonb' and value: + value = value.dict + if field.type in ('date', 'datetime') and value: + value = value.isoformat() + + return value + + def _read_schema(self, schema: RequestSchema) -> ResponseData: + """ Reads the current recordset according to the requested schema. """ + assert isinstance(schema, dict), 'invalid schema' + + # We want to prevent infinite loops so we need to track which relations + # have already been explored, this concerns many2one, many2many + def _read_record( + data: ModelData, + record: PublicModelMixin, + request_schema: RequestSchema, + visited_relations: set[str], + ): + """ Reads the given record according to the schema inline""" + data['id'] = record.id + for field, sub_schema in request_schema.items(): + sub_schema = _cleaned_schema(sub_schema) + if field in SCHEMA_METADATA_FIELDS: # it is authorized to pass the public schema directly + continue + _field = record._fields[field] + if not _field.relational: + data[field] = record._read_schema_read_field(_field) + else: + field_key = record._get_relation_cache_key(_field) + if field_key in visited_relations and not self.env.user.has_group('runbot.group_runbot_admin'): + raise ValueError('Tried to access a relation both ways.') + visited_relations.add(field_key) + if not isinstance(_field, fields._RelationalMulti): + co_record = record[field] + value = None + if co_record and not sub_schema: + value = co_record.id + elif co_record: + value = _read_record( + {}, + co_record, + sub_schema, + {*visited_relations}, + ) + data[field] = value + else: + co_records = record[field] + if not sub_schema: + value = [co_record.id for co_record in co_records] + else: + value = [ + _read_record({}, co_record, sub_schema, {*visited_relations}) + for co_record in co_records + ] + data[field] = value + return data + + return [_read_record({}, record, schema, set()) for record in self] diff --git a/runbot_test/__init__.py b/runbot_test/__init__.py new file mode 100644 index 00000000..253f5724 --- /dev/null +++ b/runbot_test/__init__.py @@ -0,0 +1 @@ +from . import models as models diff --git a/runbot_test/__manifest__.py b/runbot_test/__manifest__.py new file mode 100644 index 00000000..9d4c9a71 --- /dev/null +++ b/runbot_test/__manifest__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +{ + 'name': "runbot test", + 'summary': "Runbot test", + 'description': "Runbot test", + 'author': "Odoo SA", + 'website': "http://runbot.odoo.com", + 'category': 'Website', + 'version': '1.0', + 'depends': ['runbot'], + 'license': 'LGPL-3', + 'data': [ + 'security/ir.model.access.csv', + ], +} diff --git a/runbot_test/models.py b/runbot_test/models.py new file mode 100644 index 00000000..00e9baa7 --- /dev/null +++ b/runbot_test/models.py @@ -0,0 +1,52 @@ +from odoo import models, api, fields +from odoo.addons.runbot.fields import JsonDictField + + +class TestModelParent(models.Model): + _name = 'runbot.test.model.parent' + _inherit = ['runbot.public.model.mixin'] + _description = 'parent' + + private_field = fields.Boolean(public=False) + + field_bool = fields.Boolean(public=True) + field_integer = fields.Integer(public=True) + field_float = fields.Float(public=True) + field_char = fields.Char(public=True) + field_text = fields.Text(public=True) + field_html = fields.Html(public=True) + field_date = fields.Date(public=True) + field_datetime = fields.Datetime(public=True) + field_selection = fields.Selection(selection=[('a', 'foo'), ('b', 'bar')], public=True) + field_json = JsonDictField(public=True) + + field_many2one = fields.Many2one('runbot.test.model.parent', public=True) + field_one2many = fields.One2many('runbot.test.model.child', 'parent_id', public=True) + field_many2many = fields.Many2many( + 'runbot.test.model.parent', relation='test_model_relation', public=True, + column1='col_1', column2='col_2', + ) + + field_one2many_computed = fields.One2many('runbot.test.model.child', compute='_compute_one2many', public=True) + field_many2many_computed = fields.Many2many('runbot.test.model.parent', compute='_compute_many2many', public=True) + + @api.depends() + def _compute_one2many(self): + self.field_one2many_computed = self.env['runbot.test.model.child'].search([]) + + @api.depends() + def _compute_many2many(self): + for rec in self: + rec.field_many2many_computed = self.env['runbot.test.model.parent'].search([ + ('id', '!=', rec.id), + ]) + + +class TestModelChild(models.Model): + _name = 'runbot.test.model.child' + _inherit = ['runbot.public.model.mixin'] + _description = 'child' + + parent_id = fields.Many2one('runbot.test.model.parent', required=True, public=True) + + data = fields.Integer(public=True) diff --git a/runbot_test/security/ir.model.access.csv b/runbot_test/security/ir.model.access.csv new file mode 100644 index 00000000..19c6dd3c --- /dev/null +++ b/runbot_test/security/ir.model.access.csv @@ -0,0 +1,6 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +runbot_test.access_runbot_test_model_parent,access_runbot_test_model_parent,runbot_test.model_runbot_test_model_parent,base.group_user,1,0,0,0 +runbot_test.access_runbot_test_model_child,access_runbot_test_model_child,runbot_test.model_runbot_test_model_child,base.group_user,1,0,0,0 + +runbot_test.access_runbot_test_model_parent_admin,access_runbot_test_model_parent_admin,runbot_test.model_runbot_test_model_parent,runbot.group_runbot_admin,1,1,1,1 +runbot_test.access_runbot_test_model_child_admin,access_runbot_test_model_child_admin,runbot_test.model_runbot_test_model_child,runbot.group_runbot_admin,1,1,1,1 diff --git a/runbot_test/tests/__init__.py b/runbot_test/tests/__init__.py new file mode 100644 index 00000000..f3c0cf3c --- /dev/null +++ b/runbot_test/tests/__init__.py @@ -0,0 +1 @@ +from . import test_schema as test_schema diff --git a/runbot_test/tests/test_schema.py b/runbot_test/tests/test_schema.py new file mode 100644 index 00000000..804bd10d --- /dev/null +++ b/runbot_test/tests/test_schema.py @@ -0,0 +1,277 @@ +from odoo import fields +from odoo.tests.common import TransactionCase, tagged, new_test_user + + +@tagged('-at_install', 'post_install') +class TestSchema(TransactionCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.Parent = cls.env['runbot.test.model.parent'] + cls.Child = cls.env['runbot.test.model.child'] + cls.user_basic = new_test_user( + cls.env, 'basic', + ) + + def test_parent_schema(self): + schema = self.Parent._get_public_schema() + self.assertNotIn('private_field', schema) + + self.assertEqual( + schema['field_bool'], + { + '__type': 'boolean', + '__relational': False, + } + ) + + self.assertIn('field_many2one', schema) + sub_schema = schema['field_many2one'] + self.assertIn('field_many2many', schema) + self.assertIn('field_many2many', sub_schema) + + self.assertTrue(self.Parent._verify_schema(schema)) + + def test_child_schema(self): + schema = self.Child._get_public_schema() + + self.assertIn('parent_id', schema) + self.assertIn('data', schema) + + sub_schema = schema['parent_id'] + # The reverse relation should not be part of the sub schema, as we already + # traversed that relation + self.assertNotIn('field_one2many', sub_schema) + + self.assertTrue(self.Child._verify_schema(schema)) + + def test_parent_read_basic(self): + today = fields.Date.today() + now = fields.Datetime.now() + parent = self.Parent.create({ + 'field_bool': True, + 'field_date': today, + 'field_datetime': now, + 'field_json': {'test': 1} + }) + self.assertEqual( + parent._read_schema({'field_bool': None, 'field_date': None, 'field_datetime': None, 'field_json': None}), + [{ + 'id': parent.id, + 'field_bool': True, + 'field_date': today.isoformat(), + 'field_datetime': now.isoformat(), + 'field_json': {'test': 1} + }] + ) + + def test_parent_read_m2o(self): + parent_1 = self.Parent.create({'field_integer': 1}) + parent_2 = self.Parent.create({'field_integer': 2, 'field_many2one': parent_1.id}) + + # Read without sub schema + self.assertEqual( + parent_1._read_schema({'field_many2one': None}), + [{'id': parent_1.id, 'field_many2one': None}] + ) + self.assertEqual( + parent_2._read_schema({'field_many2one': None}), + [{'id': parent_2.id, 'field_many2one': parent_1.id}] + ) + # Read with sub schema + self.assertEqual( + parent_1._read_schema({'field_many2one': {'field_integer': None}}), + [{'id': parent_1.id, 'field_many2one': None}] + ) + self.assertEqual( + parent_2._read_schema({'field_many2one': {'field_integer': None}}), + [{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_integer': parent_1.field_integer}}] + ) + both = parent_1 | parent_2 + # Read both with sub schema + self.assertEqual( + both._read_schema({'field_integer': None, 'field_many2one': {'field_integer': None}}), + [ + {'id': parent_1.id, 'field_integer': parent_1.field_integer, 'field_many2one': None}, + {'id': parent_2.id, 'field_integer': parent_2.field_integer, 'field_many2one': { + 'id': parent_1.id, 'field_integer': parent_1.field_integer, + }}, + ] + ) + + def test_parent_read_one2many(self): + parent = self.Parent.create({ + 'field_float': 13.37, + 'field_one2many': [ + (0, 0, {'data': 1}), + (0, 0, {'data': 2}), + (0, 0, {'data': 3}), + ] + }) + parent_no_o2m = self.Parent.create({ + 'field_float': 13.37, + }) + + # Basic read + self.assertEqual( + parent._read_schema({ + 'field_float': None, + 'field_one2many': { + 'data': None + } + }), + [{ + 'id': parent.id, + 'field_float': 13.37, + 'field_one2many': [ + {'id': parent.field_one2many[0].id, 'data': 1}, + {'id': parent.field_one2many[1].id, 'data': 2}, + {'id': parent.field_one2many[2].id, 'data': 3}, + ] + }] + ) + self.assertEqual( + parent_no_o2m._read_schema({ + 'field_float': None, + 'field_one2many': { + 'data': None, + }, + }), + [{ + 'id': parent_no_o2m.id, + 'field_float': 13.37, + 'field_one2many': [], + }] + ) + + # Reading parent_id through field_one2many_computed is allowed since relationship is not known + self.env.invalidate_all() + schema = { + 'field_float': None, + 'field_one2many': { + 'data': None, + 'parent_id': { + 'field_float': None, + }, + }, + } + self.assertEqual( + parent._read_schema(schema), + [{ + 'id': parent.id, + 'field_float': 13.37, + 'field_one2many': [ + {'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + ] + }] + ) + # It should not work with basic user + with self.assertRaises(ValueError): + parent.with_user(self.user_basic)._read_schema(schema) + # It should however work with the computed version + schema['field_one2many_computed'] = schema['field_one2many'] + schema.pop('field_one2many') + self.assertEqual( + parent.with_user(self.user_basic)._read_schema(schema), + [{ + 'id': parent.id, + 'field_float': 13.37, + 'field_one2many_computed': [ + {'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + {'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}}, + ] + }] + ) + + def test_parent_read_m2m(self): + parent = self.Parent.create({ + 'field_integer': 1, + 'field_many2many': [ + (0, 0, {'field_integer': 2}), + ], + }) + other_parent = parent.field_many2many + self.assertEqual( + parent._read_schema({'field_integer': None, 'field_many2many': None}), + [ + {'id': parent.id, 'field_many2many': list(parent.field_many2many.ids), 'field_integer': 1}, + ], + ) + self.env.invalidate_all() + schema = { + 'field_many2many_computed': { + 'field_many2many_computed': { + 'field_integer': None + } + } + } + self.assertEqual( + parent._read_schema(schema), + [ + {'id': parent.id, 'field_many2many_computed': [ + {'id': other_parent.id, 'field_many2many_computed': [ + {'id': parent.id, 'field_integer': 1} + ]} + ]} + ] + ) + with self.assertRaises(ValueError): + self.assertEqual( + parent.with_user(self.user_basic)._read_schema(schema), + [ + {'id': parent.id, 'field_many2many_computed': [ + {'id': other_parent.id, 'field_many2many_computed': [ + {'id': parent.id, 'field_integer': 1} + ]} + ]} + ] + ) + + def test_parent_read_cyclic_parent(self): + parent_1 = self.Parent.create({ + 'field_integer': 1, + }) + parent_2 = self.Parent.create({ + 'field_integer': 2, + 'field_many2one': parent_1.id, + }) + parent_1.field_many2one = parent_2 + self.env.invalidate_all() + both = (parent_1 | parent_2) + self.assertEqual( + both._read_schema({'field_many2one': None}), + [ + {'id': parent_1.id, 'field_many2one': parent_2.id}, + {'id': parent_2.id, 'field_many2one': parent_1.id}, + ] + ) + self.assertEqual( + both.with_user(self.user_basic)._read_schema({'field_many2one': None}), + [ + {'id': parent_1.id, 'field_many2one': parent_2.id}, + {'id': parent_2.id, 'field_many2one': parent_1.id}, + ] + ) + schema = {'field_many2one': {'field_many2one': None}} + self.assertEqual( + both._read_schema(schema), + [ + {'id': parent_1.id, 'field_many2one': {'id': parent_2.id, 'field_many2one': parent_1.id}}, + {'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_many2one': parent_2.id}}, + ] + ) + with self.assertRaises(ValueError): + self.assertEqual( + both.with_user(self.user_basic)._read_schema(schema), + [ + {'id': parent_1.id, 'field_many2one': {'id': parent_2.id, 'field_many2one': parent_1.id}}, + {'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_many2one': parent_2.id}}, + ] + ) + +