mirror of
https://github.com/odoo/runbot.git
synced 2025-03-20 01:45:49 +07:00
[IMP] runbot: implement public api mixin
Until now api routes on runbot have been created through custom website pages in production. We want to unify the API by making a 'public' api, inspired by the way `web_search_read` works. This commit adds: - A route to list all publicly available models - A route to do a read on a public model - A route to fetch the publicly available specification for a model - A public model mixin that provides all the tools required to support the above mentionned routes. The mixin adds the ability to add the `public` attribute on fields. Any field marked as public can then be publicly queried through the controller. Relational fields work in a nested manner (`fields` key in the field's sub-specification) (up to a depth of 10). The public api does not allow going through a relationship back and front (parent->child->parent is NOT allowed). Because we are based on `web_search_read`, we heavily focus on validating the specification, for security reasons, and offset the load of reading to the `web_read` function (we currently don't provide limit metadata).
This commit is contained in:
parent
3cf9dd6fa2
commit
b51909fab6
@ -3,3 +3,4 @@
|
||||
from . import frontend
|
||||
from . import hook
|
||||
from . import badge
|
||||
from . import public_api
|
||||
|
87
runbot/controllers/public_api.py
Normal file
87
runbot/controllers/public_api.py
Normal file
@ -0,0 +1,87 @@
|
||||
import json
|
||||
|
||||
from werkzeug.exceptions import BadRequest, Forbidden
|
||||
|
||||
from odoo.exceptions import AccessError
|
||||
from odoo.http import Controller, request, route
|
||||
from odoo.tools import mute_logger
|
||||
|
||||
from odoo.addons.runbot.models.public_model_mixin import PublicModelMixin
|
||||
|
||||
|
||||
class PublicApi(Controller):
|
||||
|
||||
@mute_logger('odoo.addons.base.models.ir_model') # We don't care about logging acl errors
|
||||
def _get_model(self, model: str) -> PublicModelMixin:
|
||||
"""
|
||||
Returns the model from a model string.
|
||||
|
||||
Raises the appropriate exception if:
|
||||
- The model does not exist
|
||||
- The model is not a public model
|
||||
- The current user can not read the model
|
||||
"""
|
||||
pool = request.env.registry
|
||||
try:
|
||||
Model = pool[model]
|
||||
except KeyError:
|
||||
raise BadRequest('Unknown model')
|
||||
if not issubclass(Model, pool['runbot.public.model.mixin']):
|
||||
raise BadRequest('Unknown model')
|
||||
Model = request.env[model]
|
||||
Model.check_access('read')
|
||||
if not Model._api_request_allow_direct_access():
|
||||
raise Forbidden('This model does not allow direct access')
|
||||
return Model
|
||||
|
||||
@route('/runbot/api/models', auth='public', methods=['GET'], readonly=True)
|
||||
def models(self):
|
||||
models = []
|
||||
for model in request.env.keys():
|
||||
try:
|
||||
models.append(self._get_model(model))
|
||||
except (BadRequest, AccessError, Forbidden):
|
||||
pass
|
||||
return request.make_json_response(
|
||||
[Model._name for Model in models]
|
||||
)
|
||||
|
||||
@route('/runbot/api/<model>/read', auth='public', methods=['POST'], readonly=True, csrf=False)
|
||||
def read(self, *, model: str):
|
||||
Model = self._get_model(model)
|
||||
required_keys = Model._api_request_required_keys()
|
||||
allowed_keys = Model._api_request_allowed_keys()
|
||||
try:
|
||||
data = request.get_json_data()
|
||||
except json.JSONDecodeError:
|
||||
raise BadRequest('Invalid payload, missing or malformed json')
|
||||
if not isinstance(data, dict):
|
||||
raise BadRequest('Invalid payload, should be a dict.')
|
||||
if (missing_keys := required_keys - set(data.keys())):
|
||||
raise BadRequest(f'Invalid payload, missing keys: {", ".join(missing_keys)}')
|
||||
if (unknown_keys := set(data.keys()) - allowed_keys):
|
||||
raise BadRequest(f'Invalid payload, unknown keys: {", ".join(unknown_keys)}')
|
||||
if Model._api_request_requires_project():
|
||||
if not isinstance(data['project_id'], int):
|
||||
raise BadRequest('Invalid project_id, should be an int')
|
||||
# This is an additional layer of protection for project_id
|
||||
project = request.env['runbot.project'].browse(data['project_id']).exists()
|
||||
if not project:
|
||||
raise BadRequest('Unknown project_id')
|
||||
project.check_access('read')
|
||||
Model = Model.with_context(project_id=project.id)
|
||||
return request.make_json_response(Model._api_request_read(data))
|
||||
|
||||
@route('/runbot/api/<model>/spec', auth='public', methods=['GET'], readonly=True)
|
||||
def spec(self, *, model: str):
|
||||
Model = self._get_model(model)
|
||||
required_keys = Model._api_request_required_keys()
|
||||
allowed_keys = Model._api_request_allowed_keys()
|
||||
return request.make_json_response({
|
||||
'requires_project': Model._api_request_requires_project(),
|
||||
'default_page_size': Model._api_request_default_limit(),
|
||||
'max_page_size': Model._api_request_max_limit(),
|
||||
'required_keys': list(Model._api_request_required_keys()),
|
||||
'allowed_keys': list(allowed_keys - required_keys),
|
||||
'specification': self._get_model(model)._api_public_specification(),
|
||||
})
|
@ -1,5 +1,7 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from . import public_model_mixin
|
||||
|
||||
from . import batch
|
||||
from . import branch
|
||||
from . import build
|
||||
|
315
runbot/models/public_model_mixin.py
Normal file
315
runbot/models/public_model_mixin.py
Normal file
@ -0,0 +1,315 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from werkzeug.exceptions import BadRequest, Forbidden
|
||||
|
||||
from typing import Dict, Union, List, Self, TypedDict
|
||||
|
||||
from odoo import models, api, fields, tools
|
||||
from odoo.osv import expression
|
||||
|
||||
|
||||
class SubSpecification(TypedDict):
|
||||
context: Dict
|
||||
fields: Dict[str, 'SubSpecification']
|
||||
Specification = Dict[str, Union[Dict, 'SubSpecification']]
|
||||
|
||||
SUPPORTED_FIELD_TYPES = { # Perhaps this should be a list of class instead
|
||||
'boolean', 'integer', 'float', 'char', 'text', 'html',
|
||||
'date', 'datetime', 'selection', 'jsonb',
|
||||
'many2one', 'one2many', 'many2many',
|
||||
}
|
||||
RELATIONAL_FIELD_TYPES = {'many2one', 'one2many', 'many2many'}
|
||||
SPEC_MAX_DEPTH = 10
|
||||
SPEC_METADATA_FIELD = {
|
||||
'__type', '__help',
|
||||
}
|
||||
DEFAULT_LIMIT = 20
|
||||
DEFAULT_MAX_LIMIT = 60
|
||||
|
||||
def _cleaned_spec(spec: Specification | SubSpecification) -> Specification | SubSpecification:
|
||||
""" Returns the specification without metadata fields. """
|
||||
if not isinstance(spec, dict):
|
||||
return spec
|
||||
return {
|
||||
k: v for k, v in spec.items()
|
||||
if k not in SPEC_METADATA_FIELD
|
||||
}
|
||||
|
||||
class PublicModelMixin(models.AbstractModel):
|
||||
_name = 'runbot.public.model.mixin'
|
||||
_description = 'Mixin for publicly accessible data'
|
||||
|
||||
@api.model
|
||||
def _valid_field_parameter(self, field: fields.Field, name: str):
|
||||
if field.type in SUPPORTED_FIELD_TYPES:
|
||||
return name in (
|
||||
# boolean, whether the field is readable through the public api,
|
||||
# public fields on record on which the user does not have access are not exposed.
|
||||
'public',
|
||||
) or super()._valid_field_parameter(field, name)
|
||||
return super()._valid_field_parameter(field, name)
|
||||
|
||||
@api.model
|
||||
def _get_public_fields(self) -> List[fields.Field]:
|
||||
""" Returns a list of publicly readable fields. """
|
||||
return [
|
||||
field for field in self._fields.values()
|
||||
if getattr(field, 'public', None) or field.name == 'id'
|
||||
]
|
||||
|
||||
########## REQUESTS ##########
|
||||
|
||||
@api.model
|
||||
def _api_request_allow_direct_access(self) -> bool:
|
||||
""" Returns whether this model is accessible directly through the api. """
|
||||
return True
|
||||
|
||||
@api.model
|
||||
def _api_request_allowed_keys(self) -> set[str]:
|
||||
""" Returns a list of allowed keys for request_data. """
|
||||
return self._api_request_required_keys() | {
|
||||
'context',
|
||||
'limit', 'offset',
|
||||
}
|
||||
|
||||
@api.model
|
||||
def _api_request_default_limit(self) -> int:
|
||||
return DEFAULT_LIMIT
|
||||
|
||||
@api.model
|
||||
def _api_request_max_limit(self) -> int:
|
||||
return DEFAULT_MAX_LIMIT
|
||||
|
||||
@api.model
|
||||
def _api_request_required_keys(self) -> set[str]:
|
||||
""" Returns a list of required keys for request_data. """
|
||||
required_keys = {'specification', 'domain'}
|
||||
if self._api_request_requires_project():
|
||||
required_keys.add('project_id')
|
||||
return required_keys
|
||||
|
||||
@api.model
|
||||
def _api_request_requires_project(self) -> bool: #TODO: rename me
|
||||
""" Public models are by default based on a project_id (filtered on project_id). """
|
||||
return self._api_request_allow_direct_access()
|
||||
|
||||
@api.model
|
||||
def _api_project_id_field_path(self) -> str:
|
||||
""" Returns the path from the current object to project_id. """
|
||||
raise NotImplementedError('_api_project_id_field_path not implemented')
|
||||
|
||||
@api.model
|
||||
def _api_request_validate_domain(self, domain: list[str | tuple | list]):
|
||||
"""
|
||||
Validates a domain against the public spec.
|
||||
|
||||
This only validates that all the fields in the domain are queryable fields,
|
||||
the actual validity of the domain will be checked by the orm when
|
||||
searching for records.
|
||||
|
||||
Returns:
|
||||
domain: a transformed domain if necessary
|
||||
|
||||
Raises:
|
||||
AssertionError: unknown domain leaf
|
||||
Forbidden: invalid field used
|
||||
"""
|
||||
|
||||
try:
|
||||
self._where_calc(domain)
|
||||
except ValueError as e:
|
||||
raise BadRequest('Invalid domain') from e
|
||||
|
||||
spec: Specification = self._api_public_specification()
|
||||
# recompiles the spec into a list of fields that can be present in the domain
|
||||
valid_fields: str[str] = set()
|
||||
def _visit_spec(spec, prefix: str | None = None):
|
||||
spec = _cleaned_spec(spec)
|
||||
if not spec:
|
||||
return
|
||||
for field, sub_spec in spec.items():
|
||||
this_field = f'{prefix}.{field}' if prefix else field
|
||||
valid_fields.add(this_field)
|
||||
if sub_spec and sub_spec.get('fields'):
|
||||
_visit_spec(sub_spec['fields'], prefix=this_field)
|
||||
_visit_spec(spec)
|
||||
|
||||
for leaf in domain:
|
||||
if not isinstance(leaf, (tuple, list)):
|
||||
continue
|
||||
assert len(leaf) == 3 # Can this happen in a valid domain?
|
||||
if leaf[0] not in valid_fields and not self.env.user.has_group('runbot.group_runbot_admin'):
|
||||
raise Forbidden('Trying to filter from private field')
|
||||
|
||||
if self._api_request_requires_project():
|
||||
assert 'project_id' in self.env.context
|
||||
domain = expression.AND([
|
||||
[(self._api_project_id_field_path(), '=', self.env.context['project_id'])],
|
||||
domain
|
||||
])
|
||||
|
||||
return domain
|
||||
|
||||
@api.model
|
||||
def _api_request_read_get_offset_limit(self, request_data: dict) -> tuple[int, int]:
|
||||
if 'limit' in request_data:
|
||||
if not isinstance(request_data['limit'], int):
|
||||
raise BadRequest('Invalid page size (should be int)')
|
||||
limit = request_data['limit']
|
||||
if limit > self._api_request_max_limit():
|
||||
raise BadRequest('Page size exceeds max size')
|
||||
else:
|
||||
limit = self._api_request_default_limit()
|
||||
offset = 0
|
||||
if 'offset' in request_data:
|
||||
if not isinstance(request_data['offset'], int):
|
||||
raise BadRequest('Invalid page (should be int)')
|
||||
offset = request_data['offset']
|
||||
return limit, offset
|
||||
|
||||
@api.model
|
||||
def _api_request_read_get_records(self, request_data: dict) -> Self:
|
||||
limit, offset = self._api_request_read_get_offset_limit(request_data)
|
||||
return self.search(request_data['domain'], limit=limit, offset=offset)
|
||||
|
||||
@api.model
|
||||
def _api_request_read(self, request_data: dict) -> list[dict]:
|
||||
"""
|
||||
Processes a frontend request and returns the data to be returned by the controller.
|
||||
|
||||
This method is allowed to raise Http specific exceptions.
|
||||
"""
|
||||
specification, domain = request_data['specification'], request_data['domain']
|
||||
|
||||
try:
|
||||
if not self._api_verify_specification(specification) and\
|
||||
not self.env.user.has_group('runbot.group_runbot_admin'):
|
||||
raise Forbidden('Invalid specification or trying to access private data.')
|
||||
except (ValueError, AssertionError) as e:
|
||||
raise BadRequest('Invalid specification') from e
|
||||
|
||||
request_data['domain'] = self._api_request_validate_domain(domain)
|
||||
records = self._api_request_read_get_records(request_data)
|
||||
|
||||
return records._api_read(request_data['specification'])
|
||||
|
||||
########## SPEC ##########
|
||||
|
||||
@api.model
|
||||
def _api_get_relation_field_key(self, field: fields.Field):
|
||||
""" Returns a relation cache key for a field, a string defining the identity of the relationship. """
|
||||
if isinstance(field, fields.Many2one):
|
||||
return f'{self._name}__{field.name}'
|
||||
elif isinstance(field, fields.Many2many):
|
||||
if not field.store:
|
||||
return f'{self._name}__{field.name}'
|
||||
return field.relation
|
||||
elif isinstance(field, fields.One2many):
|
||||
if not field.store: # is this valid?
|
||||
return f'{self._name}__{field.name}'
|
||||
CoModel: PublicModelMixin = self.env[field.comodel_name]
|
||||
inverse_field = CoModel._fields[field.inverse_name]
|
||||
return CoModel._api_get_relation_field_key(inverse_field)
|
||||
raise NotImplementedError('Unsupported field')
|
||||
|
||||
@tools.ormcache()
|
||||
@api.model
|
||||
def _api_public_specification(self) -> Specification:
|
||||
"""
|
||||
Returns the public specification for the model.
|
||||
|
||||
The specification will go through all the fields marked as public.
|
||||
For relational fields, the result will be nested (up to a depth of :code:`SPEC_MAX_DEPTH`).
|
||||
|
||||
The specification will contain metadata about each fields.
|
||||
The specification returned by this method can be used directly with :code:`_api_read`.
|
||||
|
||||
Returns:
|
||||
specification: The specification as a dictionary.
|
||||
"""
|
||||
# We want to prevent infinite loops so we need to track which relations
|
||||
# have already been explored, this concerns many2one, many2many
|
||||
def _visit_model(model: PublicModelMixin, visited_relations: set[str], depth = 0) -> Specification | SubSpecification:
|
||||
spec: Specification | SubSpecification = {}
|
||||
for field in model._get_public_fields():
|
||||
field_metadata = {
|
||||
'__type': field.type,
|
||||
}
|
||||
if field.help:
|
||||
field_metadata['__help'] = field.help
|
||||
if field.relational and \
|
||||
issubclass(self.pool[field.comodel_name], PublicModelMixin):
|
||||
field_key = model._api_get_relation_field_key(field)
|
||||
if field_key in visited_relations or depth == SPEC_MAX_DEPTH:
|
||||
continue
|
||||
visited_relations.add(field_key)
|
||||
CoModel: PublicModelMixin = model.env[field.comodel_name]
|
||||
field_metadata.update(
|
||||
fields=_visit_model(CoModel, {*visited_relations}, depth + 1)
|
||||
)
|
||||
spec[field.name] = field_metadata
|
||||
return spec
|
||||
|
||||
return _visit_model(self, set())
|
||||
|
||||
@api.model
|
||||
def _api_verify_specification(self, specification: Specification) -> bool:
|
||||
"""
|
||||
Verifies a given specification against the public specification.
|
||||
|
||||
This step also provides some validation of the specification, enough that
|
||||
the spec can be safely used with `_api_read` if the method does not
|
||||
raise an exception.
|
||||
|
||||
Args:
|
||||
specification: The requested specification.
|
||||
|
||||
Returns:
|
||||
If the spec matches the public spec this method returns True
|
||||
otherwise False.
|
||||
|
||||
Raises:
|
||||
ValueError: If a sub spec is given for a non relational field.
|
||||
ValueError: If a sub spec is given for a relational field that does
|
||||
not allow public data (id only).
|
||||
"""
|
||||
public_specification: Specification = self._api_public_specification()
|
||||
|
||||
def _visit_spec(
|
||||
model_spec: Specification,
|
||||
request_spec: Specification,
|
||||
) -> bool:
|
||||
request_spec = _cleaned_spec(request_spec)
|
||||
for field, sub_spec in request_spec.items():
|
||||
sub_spec = _cleaned_spec(sub_spec)
|
||||
if field not in model_spec:
|
||||
return False
|
||||
if not isinstance(sub_spec, dict):
|
||||
raise ValueError(
|
||||
'Invalid sub spec, should be a dict.'
|
||||
)
|
||||
# For now we actually only have keys for relational fields.
|
||||
sub_spec_allowed_keys = set()
|
||||
if model_spec[field].get('__type') in RELATIONAL_FIELD_TYPES\
|
||||
and 'fields' in model_spec[field]:
|
||||
sub_spec_allowed_keys.add('fields')
|
||||
sub_spec_allowed_keys.add('context')
|
||||
if set(sub_spec.keys()) - sub_spec_allowed_keys:
|
||||
raise ValueError(
|
||||
'Invalid sub spec, contains unknown keys.'
|
||||
)
|
||||
if not sub_spec or 'fields' not in sub_spec:
|
||||
continue
|
||||
if 'fields' not in model_spec[field]:
|
||||
raise ValueError(
|
||||
f'Sub spec not available for field {field}'
|
||||
)
|
||||
if not _visit_spec(model_spec[field]['fields'], sub_spec['fields']):
|
||||
return False
|
||||
return True
|
||||
|
||||
return _visit_spec(public_specification, specification)
|
||||
|
||||
def _api_read(self, specification: Specification) -> list[dict]:
|
||||
""" Forwards the specification to `web_read`. """
|
||||
return self.web_read(specification)
|
@ -16,3 +16,4 @@ from . import test_commit
|
||||
from . import test_upgrade
|
||||
from . import test_dockerfile
|
||||
from . import test_host
|
||||
from . import test_public_api
|
||||
|
260
runbot/tests/test_public_api.py
Normal file
260
runbot/tests/test_public_api.py
Normal file
@ -0,0 +1,260 @@
|
||||
import json
|
||||
|
||||
from werkzeug.exceptions import BadRequest, Forbidden
|
||||
|
||||
from odoo.osv import expression
|
||||
from odoo.tests.common import HttpCase, TransactionCase, tagged, new_test_user
|
||||
|
||||
from odoo.addons.runbot.models.public_model_mixin import PublicModelMixin
|
||||
|
||||
|
||||
@tagged('-at_install', 'post_install')
|
||||
class TestPublicApi(HttpCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
cls.project = cls.env['runbot.project'].create({'name': 'Tests', 'process_delay': 0})
|
||||
|
||||
def get_public_models(self):
|
||||
for Model in self.registry.values():
|
||||
if not issubclass(Model, PublicModelMixin) or Model._name == 'runbot.public.model.mixin':
|
||||
continue
|
||||
yield self.env[Model._name]
|
||||
|
||||
|
||||
def test_requires_project_defines_project_id_path(self):
|
||||
for Model in self.get_public_models():
|
||||
if not Model._api_request_requires_project():
|
||||
continue
|
||||
# Try calling _api_project_id_field_path, none should fail
|
||||
with self.subTest(model=Model._name):
|
||||
try:
|
||||
Model._api_project_id_field_path()
|
||||
except NotImplementedError:
|
||||
self.fail('_api_project_id_field_path not implemented')
|
||||
|
||||
def test_direct_access_disabled(self):
|
||||
DisabledModel = self.env['runbot.commit.link']
|
||||
self.assertFalse(DisabledModel._api_request_allow_direct_access())
|
||||
|
||||
resp = self.url_open('/runbot/api/models')
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
data = resp.json()
|
||||
self.assertNotIn(DisabledModel._name, data)
|
||||
|
||||
resp = self.url_open(f'/runbot/api/{DisabledModel._name}/spec')
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
|
||||
resp = self.url_open(f'/runbot/api/{DisabledModel._name}/read', data="{}", headers={'Content-Type': 'application/json'}) # model checking happens before data checking
|
||||
self.assertEqual(resp.status_code, 403)
|
||||
|
||||
def test_api_public_basics(self):
|
||||
# This serves as a basic read test through the api
|
||||
Model = self.env['runbot.bundle']
|
||||
self.assertTrue(Model._api_request_allow_direct_access())
|
||||
|
||||
resp = self.url_open('/runbot/api/models')
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
data = resp.json()
|
||||
self.assertIn(Model._name, data)
|
||||
|
||||
resp = self.url_open(f'/runbot/api/{Model._name}/spec')
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
request_data = json.dumps({
|
||||
'domain': [],
|
||||
'specification': resp.json()['specification'],
|
||||
'project_id': self.project.id,
|
||||
})
|
||||
resp = self.url_open(f'/runbot/api/{Model._name}/read', data=request_data, headers={'Content-Type': 'application/json'})
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
def test_api_read_from_spec_public_models(self):
|
||||
# This is not ideal as we don't have any data but it is better than nothing
|
||||
for Model in self.get_public_models():
|
||||
if not Model._api_request_allow_direct_access():
|
||||
continue
|
||||
with self.subTest(model=Model._name):
|
||||
resp = self.url_open(f'/runbot/api/{Model._name}/spec')
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
data = resp.json()
|
||||
if set(data['required_keys']) > self.env['runbot.public.model.mixin']._api_request_required_keys():
|
||||
self.skipTest('Skipping, request requires unknown keys, create a specific test')
|
||||
request_data = {
|
||||
'domain': [],
|
||||
'specification': data['specification'],
|
||||
}
|
||||
if Model._api_request_requires_project():
|
||||
request_data['project_id'] = self.project.id
|
||||
request_data = json.dumps(request_data)
|
||||
resp = self.url_open(f'/runbot/api/{Model._name}/read', data=request_data, headers={'Content-Type': 'application/json'})
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
def test_api_read_homepage(self):
|
||||
# Arbitrary test testing the initial schema required for the homepage
|
||||
# We only check that the response is successful
|
||||
request_data = json.dumps({
|
||||
'domain': [['last_batch', '!=', False]],
|
||||
'project_id': self.project.id,
|
||||
# 'category_id': False Ignored for the sake of the test
|
||||
'specification': {
|
||||
"name": {},
|
||||
"branch_ids": {
|
||||
"fields": {
|
||||
"dname": {},
|
||||
"branch_url": {}
|
||||
}
|
||||
},
|
||||
"last_batchs": {
|
||||
"fields": {
|
||||
"age": {},
|
||||
"last_update": {},
|
||||
"slot_ids": {
|
||||
"fields": {
|
||||
"link_type": {},
|
||||
"trigger_id": {
|
||||
"fields": {
|
||||
"name": {}
|
||||
}
|
||||
},
|
||||
"build_id": {
|
||||
"fields": {
|
||||
"local_state": {},
|
||||
"local_result": {},
|
||||
"global_state": {},
|
||||
"global_result": {},
|
||||
"requested_action": {},
|
||||
"log_list": {},
|
||||
"version_id": {},
|
||||
"config_id": {},
|
||||
"trigger_id": {},
|
||||
"create_batch_id": {},
|
||||
"host_id": {
|
||||
"fields": {
|
||||
"name": {}
|
||||
}
|
||||
},
|
||||
"database_ids": {
|
||||
"fields": {
|
||||
"name": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"commit_link_ids": {
|
||||
"fields": {
|
||||
"match_type": {},
|
||||
"commit_id": {
|
||||
"fields": {
|
||||
"dname": {},
|
||||
"subject": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
resp = self.url_open('/runbot/api/runbot.bundle/read', data=request_data, headers={'Content-Type': 'application/json'})
|
||||
resp.raise_for_status()
|
||||
|
||||
@tagged('-at_install', 'post_install')
|
||||
class TestPublicModelApi(TransactionCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.project = self.env['runbot.project'].create({'name': 'Tests', 'process_delay': 0})
|
||||
self.basic_user = new_test_user(self.env, 'runbot')
|
||||
self.uid = self.basic_user
|
||||
# Context key used in some tests.
|
||||
self.BundleModel = self.env['runbot.bundle']\
|
||||
.with_context(project_id=self.project.id)\
|
||||
.with_user(self.basic_user)
|
||||
|
||||
def test_invalid_domain(self):
|
||||
# Unknown field
|
||||
with self.assertRaises(BadRequest):
|
||||
self.BundleModel._api_request_validate_domain([['booger', '=', 1]])
|
||||
|
||||
# Private field
|
||||
self.assertFalse(
|
||||
getattr(self.BundleModel._fields['modules'], 'public', False),
|
||||
'modules field is not private anymore, change to another private field',
|
||||
)
|
||||
with self.assertRaises(Forbidden):
|
||||
self.BundleModel._api_request_validate_domain(
|
||||
[('modules', '=', 1)]
|
||||
)
|
||||
|
||||
def test_valid_domain_add_project_id(self):
|
||||
self.assertTrue(self.BundleModel._api_request_requires_project())
|
||||
|
||||
self.assertEqual(
|
||||
self.BundleModel._api_request_validate_domain([]),
|
||||
[('project_id', '=', self.project.id)]
|
||||
)
|
||||
|
||||
def test_valid_domain(self):
|
||||
domain = [
|
||||
('name', '=', 'master'), # Basic field
|
||||
('project_id.name', '=', 'R&D'), # 1-level related field
|
||||
('project_id.trigger_ids.name', '=', 'Enterprise run'), # 2-level related field
|
||||
]
|
||||
self.assertListEqual(
|
||||
self.BundleModel._api_request_validate_domain(domain),
|
||||
expression.AND([
|
||||
[('project_id', '=', self.project.id)],
|
||||
domain,
|
||||
])
|
||||
)
|
||||
|
||||
def test_process_read_limits(self):
|
||||
request_data = {
|
||||
'domain': [],
|
||||
'specification': {},
|
||||
}
|
||||
# Test with non int limit
|
||||
with self.assertRaises(BadRequest):
|
||||
request_data['limit'] = 'test'
|
||||
self.BundleModel._api_request_read(request_data)
|
||||
# Test with limit above max
|
||||
with self.assertRaises(BadRequest):
|
||||
request_data['limit'] = self.BundleModel._api_request_max_limit() + 10
|
||||
self.BundleModel._api_request_read(request_data)
|
||||
request_data.pop('limit')
|
||||
# Test with invalid offset
|
||||
with self.assertRaises(BadRequest):
|
||||
request_data['offset'] = 'test'
|
||||
self.BundleModel._api_request_read(request_data)
|
||||
|
||||
def test_verify_spec_invalid(self):
|
||||
check = self.BundleModel._api_verify_specification
|
||||
# Test with unknown field
|
||||
self.assertFalse(
|
||||
check({
|
||||
'invalid_field': {}
|
||||
})
|
||||
)
|
||||
self.assertFalse(
|
||||
check({
|
||||
'project_id': {
|
||||
'fields': {
|
||||
'invalid_field': {}
|
||||
}
|
||||
}
|
||||
})
|
||||
)
|
||||
# Test with sub_spec not dict
|
||||
with self.assertRaises(ValueError):
|
||||
check({
|
||||
'name': ['i', 'don\'t', 'know']
|
||||
})
|
||||
# Test with unknown key in dict
|
||||
with self.assertRaises(ValueError):
|
||||
check({
|
||||
'name': {'fields': {}} # Non relational fields do not allow 'fields'
|
||||
})
|
1
runbot_test/__init__.py
Normal file
1
runbot_test/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from . import models as models
|
15
runbot_test/__manifest__.py
Normal file
15
runbot_test/__manifest__.py
Normal file
@ -0,0 +1,15 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
{
|
||||
'name': "runbot test",
|
||||
'summary': "Runbot test",
|
||||
'description': "Runbot test",
|
||||
'author': "Odoo SA",
|
||||
'website': "http://runbot.odoo.com",
|
||||
'category': 'Website',
|
||||
'version': '1.0',
|
||||
'depends': ['runbot'],
|
||||
'license': 'LGPL-3',
|
||||
'data': [
|
||||
'security/ir.model.access.csv',
|
||||
],
|
||||
}
|
70
runbot_test/models.py
Normal file
70
runbot_test/models.py
Normal file
@ -0,0 +1,70 @@
|
||||
from odoo import models, api, fields
|
||||
from odoo.addons.runbot.fields import JsonDictField
|
||||
|
||||
|
||||
class TestModelParent(models.Model):
|
||||
_name = 'runbot.test.model.parent'
|
||||
_inherit = ['runbot.public.model.mixin']
|
||||
_description = 'parent'
|
||||
|
||||
private_field = fields.Boolean(public=False)
|
||||
|
||||
field_bool = fields.Boolean(public=True)
|
||||
field_integer = fields.Integer(public=True)
|
||||
field_float = fields.Float(public=True)
|
||||
field_char = fields.Char(public=True)
|
||||
field_text = fields.Text(public=True)
|
||||
field_html = fields.Html(public=True)
|
||||
field_date = fields.Date(public=True)
|
||||
field_datetime = fields.Datetime(public=True)
|
||||
field_selection = fields.Selection(selection=[('a', 'foo'), ('b', 'bar')], public=True)
|
||||
field_json = JsonDictField(public=True)
|
||||
|
||||
field_many2one = fields.Many2one('runbot.test.model.parent', public=True)
|
||||
field_one2many = fields.One2many('runbot.test.model.child', 'parent_id', public=True)
|
||||
field_one2many_private = fields.One2many('runbot.test.model.child.private', 'parent_id', public=True)
|
||||
field_many2many = fields.Many2many(
|
||||
'runbot.test.model.parent', relation='test_model_relation', public=True,
|
||||
column1='col_1', column2='col_2',
|
||||
)
|
||||
|
||||
field_one2many_computed = fields.One2many('runbot.test.model.child', compute='_compute_one2many', public=True)
|
||||
field_many2many_computed = fields.Many2many('runbot.test.model.parent', compute='_compute_many2many', public=True)
|
||||
|
||||
@api.model
|
||||
def _api_request_allow_direct_access(self):
|
||||
return False
|
||||
|
||||
@api.depends()
|
||||
def _compute_one2many(self):
|
||||
self.field_one2many_computed = self.env['runbot.test.model.child'].search([])
|
||||
|
||||
@api.depends()
|
||||
def _compute_many2many(self):
|
||||
for rec in self:
|
||||
rec.field_many2many_computed = self.env['runbot.test.model.parent'].search([
|
||||
('id', '!=', rec.id),
|
||||
])
|
||||
|
||||
|
||||
class TestModelChild(models.Model):
|
||||
_name = 'runbot.test.model.child'
|
||||
_inherit = ['runbot.public.model.mixin']
|
||||
_description = 'child'
|
||||
|
||||
parent_id = fields.Many2one('runbot.test.model.parent', required=True, public=True)
|
||||
|
||||
data = fields.Integer(public=True)
|
||||
|
||||
@api.model
|
||||
def _api_request_allow_direct_access(self):
|
||||
return False
|
||||
|
||||
|
||||
class TestPrivateModelChild(models.Model):
|
||||
_name = 'runbot.test.model.child.private'
|
||||
_description = 'Private Child'
|
||||
|
||||
parent_id = fields.Many2one('runbot.test.model.parent', required=True)
|
||||
|
||||
data = fields.Integer()
|
8
runbot_test/security/ir.model.access.csv
Normal file
8
runbot_test/security/ir.model.access.csv
Normal file
@ -0,0 +1,8 @@
|
||||
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
|
||||
runbot_test.access_runbot_test_model_parent,access_runbot_test_model_parent,runbot_test.model_runbot_test_model_parent,base.group_user,1,0,0,0
|
||||
runbot_test.access_runbot_test_model_child,access_runbot_test_model_child,runbot_test.model_runbot_test_model_child,base.group_user,1,0,0,0
|
||||
runbot_test.access_runbot_test_model_child_private,access_runbot_test_model_child_private,runbot_test.model_runbot_test_model_child_private,base.group_user,1,0,0,0
|
||||
|
||||
runbot_test.access_runbot_test_model_parent_admin,access_runbot_test_model_parent_admin,runbot_test.model_runbot_test_model_parent,runbot.group_runbot_admin,1,1,1,1
|
||||
runbot_test.access_runbot_test_model_child_admin,access_runbot_test_model_child_admin,runbot_test.model_runbot_test_model_child,runbot.group_runbot_admin,1,1,1,1
|
||||
runbot_test.access_runbot_test_model_child_admin_private,access_runbot_test_model_child_admin_private,runbot_test.model_runbot_test_model_child_private,runbot.group_runbot_admin,1,1,1,1
|
|
1
runbot_test/tests/__init__.py
Normal file
1
runbot_test/tests/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from . import test_spec as test_spec
|
295
runbot_test/tests/test_spec.py
Normal file
295
runbot_test/tests/test_spec.py
Normal file
@ -0,0 +1,295 @@
|
||||
from odoo import fields
|
||||
from odoo.tests.common import TransactionCase, tagged, new_test_user
|
||||
|
||||
|
||||
@tagged('-at_install', 'post_install')
|
||||
class TestSpec(TransactionCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super().setUpClass()
|
||||
|
||||
cls.Parent = cls.env['runbot.test.model.parent']
|
||||
cls.Child = cls.env['runbot.test.model.child']
|
||||
cls.user_basic = new_test_user(
|
||||
cls.env, 'basic',
|
||||
)
|
||||
|
||||
def test_parent_spec(self):
|
||||
spec = self.Parent._api_public_specification()
|
||||
self.assertNotIn('private_field', spec)
|
||||
|
||||
self.assertEqual(
|
||||
spec['field_bool'],
|
||||
{
|
||||
'__type': 'boolean',
|
||||
}
|
||||
)
|
||||
|
||||
self.assertIn('field_many2one', spec)
|
||||
sub_spec = spec['field_many2one']['fields']
|
||||
self.assertIn('field_many2many', spec)
|
||||
self.assertIn('field_many2many', sub_spec)
|
||||
|
||||
self.assertTrue(self.Parent._api_verify_specification(spec))
|
||||
|
||||
def test_child_spec(self):
|
||||
spec = self.Child._api_public_specification()
|
||||
|
||||
self.assertIn('parent_id', spec)
|
||||
self.assertIn('data', spec)
|
||||
|
||||
sub_spec = spec['parent_id']
|
||||
# The reverse relation should not be part of the sub spec, as we already
|
||||
# traversed that relation
|
||||
self.assertNotIn('field_one2many', sub_spec)
|
||||
|
||||
self.assertTrue(self.Child._api_verify_specification(spec))
|
||||
|
||||
def test_parent_read_basic(self):
|
||||
today = fields.Date.today()
|
||||
now = fields.Datetime.now()
|
||||
parent = self.Parent.create({
|
||||
'field_bool': True,
|
||||
'field_date': today,
|
||||
'field_datetime': now,
|
||||
'field_json': {'test': 1}
|
||||
})
|
||||
self.assertEqual(
|
||||
parent._api_read({'field_bool': {}, 'field_date': {}, 'field_datetime': {}, 'field_json': {}}),
|
||||
[{
|
||||
'id': parent.id,
|
||||
'field_bool': True,
|
||||
'field_date': today,
|
||||
'field_datetime': now,
|
||||
'field_json': {'test': 1}
|
||||
}]
|
||||
)
|
||||
|
||||
def test_parent_read_m2o(self):
|
||||
parent_1 = self.Parent.create({'field_integer': 1})
|
||||
parent_2 = self.Parent.create({'field_integer': 2, 'field_many2one': parent_1.id})
|
||||
|
||||
# Read without sub spec
|
||||
self.assertEqual(
|
||||
parent_1._api_read({'field_many2one': {}}),
|
||||
[{'id': parent_1.id, 'field_many2one': False}]
|
||||
)
|
||||
self.assertEqual(
|
||||
parent_2._api_read({'field_many2one': {}}),
|
||||
[{'id': parent_2.id, 'field_many2one': parent_1.id}]
|
||||
)
|
||||
# Read with sub spec
|
||||
self.assertEqual(
|
||||
parent_1._api_read({'field_many2one': {'fields': {'field_integer': {}}}}),
|
||||
[{'id': parent_1.id, 'field_many2one': False}]
|
||||
)
|
||||
self.assertEqual(
|
||||
parent_2._api_read({'field_many2one': {'fields': {'field_integer': {}}}}),
|
||||
[{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_integer': parent_1.field_integer}}]
|
||||
)
|
||||
both = parent_1 | parent_2
|
||||
# Read both with sub spec
|
||||
self.assertEqual(
|
||||
both._api_read({'field_integer': {}, 'field_many2one': {'fields': {'field_integer': {}}}}),
|
||||
[
|
||||
{'id': parent_1.id, 'field_integer': parent_1.field_integer, 'field_many2one': False},
|
||||
{'id': parent_2.id, 'field_integer': parent_2.field_integer, 'field_many2one': {
|
||||
'id': parent_1.id, 'field_integer': parent_1.field_integer,
|
||||
}},
|
||||
]
|
||||
)
|
||||
|
||||
def test_parent_read_one2many(self):
|
||||
parent = self.Parent.create({
|
||||
'field_float': 13.37,
|
||||
'field_one2many': [
|
||||
(0, 0, {'data': 1}),
|
||||
(0, 0, {'data': 2}),
|
||||
(0, 0, {'data': 3}),
|
||||
]
|
||||
})
|
||||
parent_no_o2m = self.Parent.create({
|
||||
'field_float': 13.37,
|
||||
})
|
||||
|
||||
# Basic read
|
||||
self.assertEqual(
|
||||
parent._api_read({
|
||||
'field_float': {},
|
||||
'field_one2many': {
|
||||
'fields': {
|
||||
'data': {}
|
||||
},
|
||||
}
|
||||
}),
|
||||
[{
|
||||
'id': parent.id,
|
||||
'field_float': 13.37,
|
||||
'field_one2many': [
|
||||
{'id': parent.field_one2many[0].id, 'data': 1},
|
||||
{'id': parent.field_one2many[1].id, 'data': 2},
|
||||
{'id': parent.field_one2many[2].id, 'data': 3},
|
||||
]
|
||||
}]
|
||||
)
|
||||
self.assertEqual(
|
||||
parent_no_o2m._api_read({
|
||||
'field_float': {},
|
||||
'field_one2many': {
|
||||
'fields': {
|
||||
'data': {},
|
||||
}
|
||||
},
|
||||
}),
|
||||
[{
|
||||
'id': parent_no_o2m.id,
|
||||
'field_float': 13.37,
|
||||
'field_one2many': [],
|
||||
}]
|
||||
)
|
||||
|
||||
# Reading parent_id through field_one2many_computed is allowed since relationship is not known
|
||||
self.env.invalidate_all()
|
||||
spec = {
|
||||
'field_float': {},
|
||||
'field_one2many': {
|
||||
'fields': {
|
||||
'data': {},
|
||||
'parent_id': {
|
||||
'fields': {
|
||||
'field_float': {},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
self.assertEqual(
|
||||
parent._api_read(spec),
|
||||
[{
|
||||
'id': parent.id,
|
||||
'field_float': 13.37,
|
||||
'field_one2many': [
|
||||
{'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
|
||||
{'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
|
||||
{'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
|
||||
]
|
||||
}]
|
||||
)
|
||||
# It should not work with basic user
|
||||
self.assertFalse(parent.with_user(self.user_basic)._api_verify_specification(spec))
|
||||
# It should however work with the computed version
|
||||
spec['field_one2many_computed'] = spec['field_one2many']
|
||||
spec.pop('field_one2many')
|
||||
self.assertEqual(
|
||||
parent.with_user(self.user_basic)._api_read(spec),
|
||||
[{
|
||||
'id': parent.id,
|
||||
'field_float': 13.37,
|
||||
'field_one2many_computed': [
|
||||
{'id': parent.field_one2many[0].id, 'data': 1, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
|
||||
{'id': parent.field_one2many[1].id, 'data': 2, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
|
||||
{'id': parent.field_one2many[2].id, 'data': 3, 'parent_id': {'id': parent.id, 'field_float': 13.37}},
|
||||
]
|
||||
}]
|
||||
)
|
||||
|
||||
def test_parent_read_one2many_private(self):
|
||||
# Check the private one2many (field is public, model is not), we can read id but nothing else
|
||||
parent = self.Parent.create({
|
||||
'field_float': 13.37,
|
||||
'field_one2many_private': [
|
||||
(0, 0, {'data': 1}),
|
||||
]
|
||||
}).with_user(self.user_basic)
|
||||
spec = parent._api_public_specification()
|
||||
self.assertNotIn('fields', spec['field_one2many_private'])
|
||||
result = parent._api_read(spec)
|
||||
self.assertEqual(
|
||||
result[0]['field_one2many_private'],
|
||||
[parent.field_one2many_private.id],
|
||||
)
|
||||
with self.assertRaises(ValueError):
|
||||
parent._api_verify_specification({
|
||||
'field_one2many_private': {
|
||||
'fields': {
|
||||
'data': {}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
def test_parent_read_m2m(self):
|
||||
parent = self.Parent.create({
|
||||
'field_integer': 1,
|
||||
'field_many2many': [
|
||||
(0, 0, {'field_integer': 2}),
|
||||
],
|
||||
})
|
||||
other_parent = parent.field_many2many
|
||||
self.assertEqual(
|
||||
parent._api_read({'field_integer': {}, 'field_many2many': {}}),
|
||||
[
|
||||
{'id': parent.id, 'field_many2many': list(parent.field_many2many.ids), 'field_integer': 1},
|
||||
],
|
||||
)
|
||||
self.env.invalidate_all()
|
||||
spec = {
|
||||
'field_many2many_computed': {
|
||||
'fields': {
|
||||
'field_many2many_computed': {
|
||||
'fields': {
|
||||
'field_integer': {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.assertEqual(
|
||||
parent._api_read(spec),
|
||||
[
|
||||
{'id': parent.id, 'field_many2many_computed': [
|
||||
{'id': other_parent.id, 'field_many2many_computed': [
|
||||
{'id': parent.id, 'field_integer': 1}
|
||||
]}
|
||||
]}
|
||||
]
|
||||
)
|
||||
self.assertFalse(parent.with_user(self.user_basic)._api_verify_specification(spec))
|
||||
|
||||
def test_parent_read_cyclic_parent(self):
|
||||
parent_1 = self.Parent.create({
|
||||
'field_integer': 1,
|
||||
})
|
||||
parent_2 = self.Parent.create({
|
||||
'field_integer': 2,
|
||||
'field_many2one': parent_1.id,
|
||||
})
|
||||
parent_1.field_many2one = parent_2
|
||||
self.env.invalidate_all()
|
||||
both = (parent_1 | parent_2)
|
||||
self.assertEqual(
|
||||
both._api_read({'field_many2one': {}}),
|
||||
[
|
||||
{'id': parent_1.id, 'field_many2one': parent_2.id},
|
||||
{'id': parent_2.id, 'field_many2one': parent_1.id},
|
||||
]
|
||||
)
|
||||
self.assertEqual(
|
||||
both.with_user(self.user_basic)._api_read({'field_many2one': {}}),
|
||||
[
|
||||
{'id': parent_1.id, 'field_many2one': parent_2.id},
|
||||
{'id': parent_2.id, 'field_many2one': parent_1.id},
|
||||
]
|
||||
)
|
||||
spec = {'field_many2one': {'fields': {'field_many2one': {}}}}
|
||||
self.assertEqual(
|
||||
both._api_read(spec),
|
||||
[
|
||||
{'id': parent_1.id, 'field_many2one': {'id': parent_2.id, 'field_many2one': parent_1.id}},
|
||||
{'id': parent_2.id, 'field_many2one': {'id': parent_1.id, 'field_many2one': parent_2.id}},
|
||||
]
|
||||
)
|
||||
self.assertFalse(both.with_user(self.user_basic)._api_verify_specification(spec))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user