[IMP] runbot: implement public model mixin

The mixin provides tooling for graphql-ish requests with request and
response schemas.
The goal is to safely expose fields through a public api where public
users may request data for a given schema similar to how graphql works.
This commit is contained in:
William Braeckman 2025-03-13 15:15:11 +01:00
parent 9097aa4545
commit f2e79007b9
8 changed files with 559 additions and 0 deletions

View File

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
from . import public_model_mixin
from . import batch
from . import branch
from . import build

View File

@ -0,0 +1,205 @@
from __future__ import annotations
from typing import Dict, Union, List
from odoo import models, api, fields, tools
ResponseSchema = Dict[str, Dict[str, Union[str, 'ResponseSchema']]]
RequestSchema = Dict[str, Union[None, 'RequestSchema']]
ModelData = Dict[str, Union[str, float, bool, int, 'ModelData']]
ResponseData = List[ModelData]
SUPPORTED_FIELD_TYPES = { # Perhaps this should be a list of class instead
'boolean', 'integer', 'float', 'char', 'text', 'html',
'date', 'datetime', 'selection', 'jsonb',
'many2one', 'one2many', 'many2many',
}
SCHEMA_MAX_DEPTH = 10
SCHEMA_METADATA_FIELDS = {'__type', '__relational', '__help'}
def _cleaned_schema(schema: RequestSchema) -> RequestSchema | None:
if schema:
schema = {
k: v for k, v in schema.items()
if k not in SCHEMA_METADATA_FIELDS
}
# Fallback to None if the schema is empty
return schema or None
class PublicModelMixin(models.AbstractModel):
_name = 'runbot.public.model.mixin'
_description = 'Mixin for publicly accessible data'
@api.model
def _valid_field_parameter(self, field: fields.Field, name: str):
if field.type in SUPPORTED_FIELD_TYPES:
return name in (
'public', # boolean, whether the field is readable through the public api
) or super()._valid_field_parameter(field, name)
return super()._valid_field_parameter(field, name)
@api.model
def _get_public_fields(self):
""" Returns a list of publicly readable fields. """
return [
field for field in self._fields.values()
if getattr(field, 'public', None) or field.name == 'id'
]
@api.model
def _get_relation_cache_key(self, field: fields.Field):
""" Returns a cache key for a field, this is used to prevent infinite loops."""
if isinstance(field, fields.Many2one):
return f'{self._name}__{field.name}'
elif isinstance(field, fields.Many2many):
if not field.store:
return f'{self._name}__{field.name}'
return field.relation
elif isinstance(field, fields.One2many):
if not field.store: # is this valid?
return f'{self._name}__{field.name}'
CoModel: PublicModelMixin = self.env[field.comodel_name]
inverse_field = CoModel._fields[field.inverse_name]
return CoModel._get_relation_cache_key(inverse_field)
raise NotImplementedError('Unsupported field')
@tools.ormcache()
@api.model
def _get_public_schema(self) -> RequestSchema:
""" Returns an auto-generated schema according to public fields. """
# We want to prevent infinite loops so we need to track which relations
# have already been explored, this concerns many2one, many2many
def _visit_model(model: PublicModelMixin, visited_relations: set[str], depth = 0):
schema: RequestSchema = {}
for field in model._get_public_fields():
field_metadata = {
'__type': field.type,
'__relational': field.relational,
}
if field.help:
field_metadata['__help'] = field.help
if field.relational and \
PublicModelMixin._name in model.env[field.comodel_name]._inherit:
field_key = model._get_relation_cache_key(field)
if field_key in visited_relations or depth == SCHEMA_MAX_DEPTH:
continue
visited_relations.add(field_key)
CoModel: PublicModelMixin = model.env[field.comodel_name]
field_metadata.update(_visit_model(CoModel, {*visited_relations}, depth + 1))
schema[field.name] = field_metadata
return schema
return _visit_model(self, set())
@api.model
def _verify_schema(self, schema: RequestSchema) -> bool:
"""
Verifies a given schema against the public schema.
This step also provides some validation of the schema, enough that the
schema can be safely used with `_read_schema` if the method does not
raise an exception.
Args:
schema: The requested schema, it should be a dictionary with fields
as keys, and values should be None or a schema for a comodel if
applicable.
Returns:
If the schema matches the public schema this method returns True
otherwise False.
Raises:
ValueError: If a sub schema is given for a non relational field.
"""
public_schema: ResponseSchema = self._get_public_schema()
def _visit_schema(
model_schema: ResponseSchema,
request_schema: RequestSchema,
) -> bool:
for field, sub_schema in request_schema.items():
sub_schema = _cleaned_schema(sub_schema)
if field in SCHEMA_METADATA_FIELDS:
continue
if field not in model_schema:
return False
if sub_schema is None or not sub_schema.get('__relational', False):
continue
if not model_schema[field].get('__relational'):
raise ValueError(
'Invalid sub schema provided for non relational field'
)
if not _visit_schema(model_schema[field], sub_schema):
return False
return True
return _visit_schema(public_schema, schema)
def _read_schema_read_field(self, field: fields.Field):
""" Adapts the type for a given field for schema reads. """
self.ensure_one()
value = self[field.name]
if value is False and field.type != 'boolean':
value = None
if field.type == 'jsonb' and value:
value = value.dict
if field.type in ('date', 'datetime') and value:
value = value.isoformat()
return value
def _read_schema(self, schema: RequestSchema) -> ResponseData:
""" Reads the current recordset according to the requested schema. """
assert isinstance(schema, dict), 'invalid schema'
# We want to prevent infinite loops so we need to track which relations
# have already been explored, this concerns many2one, many2many
def _read_record(
data: ModelData,
record: PublicModelMixin,
request_schema: RequestSchema,
visited_relations: set[str],
):
""" Reads the given record according to the schema inline"""
data['id'] = record.id
for field, sub_schema in request_schema.items():
sub_schema = _cleaned_schema(sub_schema)
if field in SCHEMA_METADATA_FIELDS: # it is authorized to pass the public schema directly
continue
_field = record._fields[field]
if not _field.relational:
data[field] = record._read_schema_read_field(_field)
else:
field_key = record._get_relation_cache_key(_field)
if field_key in visited_relations and not self.env.user.has_group('runbot.group_runbot_admin'):
raise ValueError('Tried to access a relation both ways.')
visited_relations.add(field_key)
if not isinstance(_field, fields._RelationalMulti):
co_record = record[field]
value = None
if co_record and not sub_schema:
value = co_record.id
elif co_record:
value = _read_record(
{},
co_record,
sub_schema,
{*visited_relations},
)
data[field] = value
else:
co_records = record[field]
if not sub_schema:
value = [co_record.id for co_record in co_records]
else:
value = [
_read_record({}, co_record, sub_schema, {*visited_relations})
for co_record in co_records
]
data[field] = value
return data
return [_read_record({}, record, schema, set()) for record in self]

1
runbot_test/__init__.py Normal file
View File

@ -0,0 +1 @@
from . import models as models

View File

@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
{
'name': "runbot test",
'summary': "Runbot test",
'description': "Runbot test",
'author': "Odoo SA",
'website': "http://runbot.odoo.com",
'category': 'Website',
'version': '1.0',
'depends': ['runbot'],
'license': 'LGPL-3',
'data': [
'security/ir.model.access.csv',
],
}

52
runbot_test/models.py Normal file
View File

@ -0,0 +1,52 @@
from odoo import models, api, fields
from odoo.addons.runbot.fields import JsonDictField
class TestModelParent(models.Model):
_name = 'runbot.test.model.parent'
_inherit = ['runbot.public.model.mixin']
_description = 'parent'
private_field = fields.Boolean(public=False)
field_bool = fields.Boolean(public=True)
field_integer = fields.Integer(public=True)
field_float = fields.Float(public=True)
field_char = fields.Char(public=True)
field_text = fields.Text(public=True)
field_html = fields.Html(public=True)
field_date = fields.Date(public=True)
field_datetime = fields.Datetime(public=True)
field_selection = fields.Selection(selection=[('a', 'foo'), ('b', 'bar')], public=True)
field_json = JsonDictField(public=True)
field_many2one = fields.Many2one('runbot.test.model.parent', public=True)
field_one2many = fields.One2many('runbot.test.model.child', 'parent_id', public=True)
field_many2many = fields.Many2many(
'runbot.test.model.parent', relation='test_model_relation', public=True,
column1='col_1', column2='col_2',
)
field_one2many_computed = fields.One2many('runbot.test.model.child', compute='_compute_one2many', public=True)
field_many2many_computed = fields.Many2many('runbot.test.model.parent', compute='_compute_many2many', public=True)
@api.depends()
def _compute_one2many(self):
self.field_one2many_computed = self.env['runbot.test.model.child'].search([])
@api.depends()
def _compute_many2many(self):
for rec in self:
rec.field_many2many_computed = self.env['runbot.test.model.parent'].search([
('id', '!=', rec.id),
])
class TestModelChild(models.Model):
_name = 'runbot.test.model.child'
_inherit = ['runbot.public.model.mixin']
_description = 'child'
parent_id = fields.Many2one('runbot.test.model.parent', required=True, public=True)
data = fields.Integer(public=True)

View File

@ -0,0 +1,6 @@
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
runbot_test.access_runbot_test_model_parent,access_runbot_test_model_parent,runbot_test.model_runbot_test_model_parent,base.group_user,1,0,0,0
runbot_test.access_runbot_test_model_child,access_runbot_test_model_child,runbot_test.model_runbot_test_model_child,base.group_user,1,0,0,0
runbot_test.access_runbot_test_model_parent_admin,access_runbot_test_model_parent_admin,runbot_test.model_runbot_test_model_parent,runbot.group_runbot_admin,1,1,1,1
runbot_test.access_runbot_test_model_child_admin,access_runbot_test_model_child_admin,runbot_test.model_runbot_test_model_child,runbot.group_runbot_admin,1,1,1,1
1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
2 runbot_test.access_runbot_test_model_parent access_runbot_test_model_parent runbot_test.model_runbot_test_model_parent base.group_user 1 0 0 0
3 runbot_test.access_runbot_test_model_child access_runbot_test_model_child runbot_test.model_runbot_test_model_child base.group_user 1 0 0 0
4 runbot_test.access_runbot_test_model_parent_admin access_runbot_test_model_parent_admin runbot_test.model_runbot_test_model_parent runbot.group_runbot_admin 1 1 1 1
5 runbot_test.access_runbot_test_model_child_admin access_runbot_test_model_child_admin runbot_test.model_runbot_test_model_child runbot.group_runbot_admin 1 1 1 1

View File

@ -0,0 +1 @@
from . import test_schema as test_schema

View File

@ -0,0 +1,277 @@
from odoo import fields
from odoo.tests.common import TransactionCase, tagged, new_test_user
@tagged('-at_install', 'post_install')
class TestSchema(TransactionCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.Parent = cls.env['runbot.test.model.parent']
cls.Child = cls.env['runbot.test.model.child']
cls.user_basic = new_test_user(
cls.env, 'basic',
)
def test_parent_schema(self):
schema = self.Parent._get_public_schema()
self.assertNotIn('private_field', schema)
self.assertEqual(
schema['field_bool'],
{
'__type': 'boolean',
'__relational': False,
}
)
self.assertIn('field_many2one', schema)
sub_schema = schema['field_many2one']
self.assertIn('field_many2many', schema)
self.assertIn('field_many2many', sub_schema)
self.assertTrue(self.Parent._verify_schema(schema))
def test_child_schema(self):
schema = self.Child._get_public_schema()
self.assertIn('parent_id', schema)
self.assertIn('data', schema)
sub_schema = schema['parent_id']
# The reverse relation should not be part of the sub schema, as we already
# traversed that relation
self.assertNotIn('field_one2many', sub_schema)
self.assertTrue(self.Child._verify_schema(schema))
def test_parent_read_basic(self):
today = fields.Date.today()
now = fields.Datetime.now()
parent = self.Parent.create({
'field_bool': True,
'field_date': today,
'field_datetime': now,
'field_json': {'test': 1}
})
self.assertEqual(
parent._read_schema({'field_bool': None, 'field_date': None, 'field_datetime': None, 'field_json': None}),
[{
'id': parent.id,
'field_bool': True,
'field_date': today.isoformat(),
'field_datetime': now.isoformat(),
'field_json': {'test': 1}
}]
)
def test_parent_read_m2o(self):
parent_1 = self.Parent.create({'field_integer': 1})
parent_2 = self.Parent.create({'field_integer': 2, 'field_many2one': parent_1.id})
# Read without sub schema
self.assertEqual(
parent_1._read_schema({'field_many2one': None}),
[{'id': parent_1.id, 'field_many2one': None}]
)
self.assertEqual(
parent_2._read_schema({'field_many2one': None}),
[{'id': parent_2.id, 'field_many2one': parent_1.id}]
)
# Read with sub schema
self.assertEqual(
parent_1._read_schema({'field_many2one': {'field_integer': None}}),
[{'id': parent_1.id, 'field_many2one': None}]
)
self.assertEqual(
parent_2._read_schema({'field_many2one': {'field_integer': None}}),
[{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_integer': parent_1.field_integer}}]
)
both = parent_1 | parent_2
# Read both with sub schema
self.assertEqual(
both._read_schema({'field_integer': None, 'field_many2one': {'field_integer': None}}),
[
{'id': parent_1.id, 'field_integer': parent_1.field_integer, 'field_many2one': None},
{'id': parent_2.id, 'field_integer': parent_2.field_integer, 'field_many2one': {
'id': parent_1.id, 'field_integer': parent_1.field_integer,
}},
]
)
def test_parent_read_one2many(self):
parent = self.Parent.create({
'field_float': 13.37,
'field_one2many': [
(0, 0, {'data': 1}),
(0, 0, {'data': 2}),
(0, 0, {'data': 3}),
]
})
parent_no_o2m = self.Parent.create({
'field_float': 13.37,
})
# Basic read
self.assertEqual(
parent._read_schema({
'field_float': None,
'field_one2many': {
'data': None
}
}),
[{
'id': parent.id,
'field_float': 13.37,
'field_one2many': [
{'id': parent.field_one2many[0].id, 'data': 1},
{'id': parent.field_one2many[1].id, 'data': 2},
{'id': parent.field_one2many[2].id, 'data': 3},
]
}]
)
self.assertEqual(
parent_no_o2m._read_schema({
'field_float': None,
'field_one2many': {
'data': None,
},
}),
[{
'id': parent_no_o2m.id,
'field_float': 13.37,
'field_one2many': [],
}]
)
# Reading parent_id through field_one2many_computed is allowed since relationship is not known
self.env.invalidate_all()
schema = {
'field_float': None,
'field_one2many': {
'data': None,
'parent_id': {
'field_float': None,
},
},
}
self.assertEqual(
parent._read_schema(schema),
[{
'id': parent.id,
'field_float': 13.37,
'field_one2many': [
{'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
{'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
{'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
]
}]
)
# It should not work with basic user
with self.assertRaises(ValueError):
parent.with_user(self.user_basic)._read_schema(schema)
# It should however work with the computed version
schema['field_one2many_computed'] = schema['field_one2many']
schema.pop('field_one2many')
self.assertEqual(
parent.with_user(self.user_basic)._read_schema(schema),
[{
'id': parent.id,
'field_float': 13.37,
'field_one2many_computed': [
{'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
{'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
{'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
]
}]
)
def test_parent_read_m2m(self):
parent = self.Parent.create({
'field_integer': 1,
'field_many2many': [
(0, 0, {'field_integer': 2}),
],
})
other_parent = parent.field_many2many
self.assertEqual(
parent._read_schema({'field_integer': None, 'field_many2many': None}),
[
{'id': parent.id, 'field_many2many': list(parent.field_many2many.ids), 'field_integer': 1},
],
)
self.env.invalidate_all()
schema = {
'field_many2many_computed': {
'field_many2many_computed': {
'field_integer': None
}
}
}
self.assertEqual(
parent._read_schema(schema),
[
{'id': parent.id, 'field_many2many_computed': [
{'id': other_parent.id, 'field_many2many_computed': [
{'id': parent.id, 'field_integer': 1}
]}
]}
]
)
with self.assertRaises(ValueError):
self.assertEqual(
parent.with_user(self.user_basic)._read_schema(schema),
[
{'id': parent.id, 'field_many2many_computed': [
{'id': other_parent.id, 'field_many2many_computed': [
{'id': parent.id, 'field_integer': 1}
]}
]}
]
)
def test_parent_read_cyclic_parent(self):
parent_1 = self.Parent.create({
'field_integer': 1,
})
parent_2 = self.Parent.create({
'field_integer': 2,
'field_many2one': parent_1.id,
})
parent_1.field_many2one = parent_2
self.env.invalidate_all()
both = (parent_1 | parent_2)
self.assertEqual(
both._read_schema({'field_many2one': None}),
[
{'id': parent_1.id, 'field_many2one': parent_2.id},
{'id': parent_2.id, 'field_many2one': parent_1.id},
]
)
self.assertEqual(
both.with_user(self.user_basic)._read_schema({'field_many2one': None}),
[
{'id': parent_1.id, 'field_many2one': parent_2.id},
{'id': parent_2.id, 'field_many2one': parent_1.id},
]
)
schema = {'field_many2one': {'field_many2one': None}}
self.assertEqual(
both._read_schema(schema),
[
{'id': parent_1.id, 'field_many2one': {'id': parent_2.id, 'field_many2one': parent_1.id}},
{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_many2one': parent_2.id}},
]
)
with self.assertRaises(ValueError):
self.assertEqual(
both.with_user(self.user_basic)._read_schema(schema),
[
{'id': parent_1.id, 'field_many2one': {'id': parent_2.id, 'field_many2one': parent_1.id}},
{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_many2one': parent_2.id}},
]
)