Odoo18-Base/addons/web/controllers/json.py
2025-01-06 10:57:38 +07:00

344 lines
14 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import ast
import logging
from collections import defaultdict
from datetime import date
from http import HTTPStatus
from urllib.parse import urlencode
import psycopg2.errors
from dateutil.relativedelta import relativedelta
from lxml import etree
from werkzeug.exceptions import BadRequest, NotFound
from odoo import http
from odoo.exceptions import AccessError
from odoo.http import request
from odoo.models import regex_object_name
from odoo.osv import expression
from odoo.tools.safe_eval import safe_eval
from .utils import get_action_triples
_logger = logging.getLogger(__name__)
class WebJsonController(http.Controller):
# for /json, the route should work in a browser, therefore type=http
@http.route('/json/<path:subpath>', auth='user', type='http', readonly=True)
def web_json(self, subpath, **kwargs):
self._check_json_route_active()
return request.redirect(
f'/json/1/{subpath}?{urlencode(kwargs)}',
HTTPStatus.TEMPORARY_REDIRECT
)
@http.route('/json/1/<path:subpath>', auth='bearer', type='http', readonly=True)
def web_json_1(self, subpath, **kwargs):
"""Simple JSON representation of the views.
Get the JSON representation of the action/view as it would be shown
in the web client for the same /odoo `subpath`.
Behaviour:
- When, the action resolves to a pair (Action, id), `form` view_type.
Otherwise when it resolves to (Action, None), use the given view_type
or the preferred one.
- View form uses `web_read`.
- If a groupby is given, use a read group.
Views pivot, graph redirect to a canonical URL with a groupby.
- Otherwise use a search read.
- If any parameter is missing, redirect to the canonical URL (one where
all parameters are set).
:param subpath: Path to the (window) action to execute
:param view_type: View type from which we generate the parameters
:param domain: The domain for searches
:param offset: Offset for search
:param limit: Limit for search
:param groupby: Comma-separated string; when set, executes a `web_read_group`
and groups by the given fields
:param fields: Comma-separates aggregates for the "group by" query
:param start_date: When applicable, minimum date (inclusive bound)
:param end_date: When applicable, maximum date (exclusive bound)
"""
self._check_json_route_active()
if not request.env.user.has_group('base.group_allow_export'):
raise AccessError(request.env._("You need export permissions to use the /json route"))
# redirect when the computed kwargs and the kwargs from the URL are different
param_list = set(kwargs)
def check_redirect():
# when parameters were added, redirect
if set(param_list) == set(kwargs):
return None
# for domains, make chars as safe
encoded_kwargs = urlencode(kwargs, safe="()[], '\"")
return request.redirect(
f'/json/1/{subpath}?{encoded_kwargs}',
HTTPStatus.TEMPORARY_REDIRECT
)
# Get the action
env = request.env
action, context, eval_context, record_id = self._get_action(subpath)
model = env[action.res_model].with_context(context)
# Get the view
view_type = kwargs.get('view_type')
if not view_type and record_id:
view_type = 'form'
view_id, view_type = get_view_id_and_type(action, view_type)
view = model.get_view(view_id, view_type)
spec = model._get_fields_spec(view)
# Simple case: form view with record
if view_type == 'form' or record_id:
if redirect := check_redirect():
return redirect
if not record_id:
raise BadRequest(env._("Missing record id"))
res = model.browse(int(record_id)).web_read(spec)[0]
return request.make_json_response(res)
# Find domain and limits
domains = [safe_eval(action.domain or '[]', eval_context)]
if 'domain' in kwargs:
# for the user-given domain, use only literal-eval instead of safe_eval
user_domain = ast.literal_eval(kwargs.get('domain') or '[]')
domains.append(user_domain)
else:
default_domain = get_default_domain(model, action, context, eval_context)
if default_domain and default_domain != expression.TRUE_DOMAIN:
kwargs['domain'] = repr(default_domain)
domains.append(default_domain)
try:
limit = int(kwargs.get('limit', 0)) or action.limit
offset = int(kwargs.get('offset', 0))
except ValueError as exc:
raise BadRequest(exc.args[0]) from exc
if 'offset' not in kwargs:
kwargs['offset'] = offset
if 'limit' not in kwargs:
kwargs['limit'] = limit
# Additional info from the view
view_tree = etree.fromstring(view['arch'])
# Add date domain for some view types
if view_type in ('calendar', 'gantt', 'cohort'):
try:
start_date = date.fromisoformat(kwargs['start_date'])
end_date = date.fromisoformat(kwargs['end_date'])
except ValueError as exc:
raise BadRequest(exc.args[0]) from exc
except KeyError:
start_date = end_date = None
date_domain = get_date_domain(start_date, end_date, view_tree)
domains.append(date_domain)
if 'start_date' not in kwargs or end_date not in kwargs:
kwargs.update({
'start_date': date_domain[0][2].isoformat(),
'end_date': date_domain[1][2].isoformat(),
})
# Add explicitly activity fields for an activity view
if view_type == 'activity':
domains.append([('activity_ids', '!=', False)])
# add activity fields
for field_name, field in model._fields.items():
if field_name.startswith('activity_') and field_name not in spec and field.is_accessible(env):
spec[field_name] = {}
# Group by
groupby, fields = get_groupby(view_tree, kwargs.get('groupby'), kwargs.get('fields'))
if groupby is not None and not kwargs.get('groupby'):
# add arguments to kwargs
kwargs['groupby'] = ','.join(groupby)
if 'fields' not in kwargs and fields:
kwargs['fields'] = ','.join(fields)
if groupby is None and fields:
# add fields to the spec
for field in fields:
spec.setdefault(field, {})
# Last checks before the query
if redirect := check_redirect():
return redirect
domain = expression.AND(domains)
# Reading a group or a list
if groupby:
res = model.web_read_group(
domain,
fields=fields or ['__count'],
groupby=groupby,
limit=limit,
lazy=False,
)
# pop '__domain' key
for value in res['groups']:
del value['__domain']
else:
res = model.web_search_read(
domain,
spec,
limit=limit,
offset=offset,
)
return request.make_json_response(res)
def _check_json_route_active(self):
# experimental route, only enabled in demo mode or when explicitly set
if not (request.env.ref('base.module_base').demo
or request.env['ir.config_parameter'].sudo().get_param('web.json.enabled')):
raise NotFound()
def _get_action(self, subpath):
def get_action_triples_():
try:
yield from get_action_triples(request.env, subpath, start_pos=1)
except ValueError as exc:
raise BadRequest(exc.args[0]) from exc
context = dict(request.env.context)
active_id, action, record_id = list(get_action_triples_())[-1]
action = action.sudo()
if action.usage == 'ir_actions_server' and action.path:
# force read-only evaluation of action_data
try:
with action.pool.cursor(readonly=True) as ro_cr:
if not ro_cr.readonly:
ro_cr.connection.set_session(readonly=True)
assert ro_cr.readonly
action_data = action.with_env(action.env(cr=ro_cr, su=False)).run()
except psycopg2.errors.ReadOnlySqlTransaction as e:
# never retry on RO connection, just leave
raise AccessError(action.env._("Unsupported server action")) from e
except ValueError as e:
# safe_eval wraps the error into a ValueError (as str)
if "ReadOnlySqlTransaction" not in e.args[0]:
raise
raise AccessError(action.env._("Unsupported server action")) from e
# transform data into a new record
action = action.env[action_data['type']]
action = action.new(action_data, origin=action.browse(action_data.pop('id')))
if action._name != 'ir.actions.act_window':
e = f"{action._name} are not supported server-side"
raise BadRequest(e)
eval_context = dict(
action._get_eval_context(action),
active_id=active_id,
context=context,
)
# update the context and return
context.update(safe_eval(action.context, eval_context))
return action, context, eval_context, record_id
def get_view_id_and_type(action, view_type: str | None) -> tuple[int | None, str]:
"""Extract the view id from the action"""
assert action._name == 'ir.actions.act_window'
view_modes = action.view_mode.split(',')
if not view_type:
view_type = view_modes[0]
for view_id, action_view_type in action.views:
if view_type == action_view_type:
break
else:
if view_type not in view_modes:
raise BadRequest(request.env._(
"Invalid view type '%(view_type)s' for action id=%(action)s",
view_type=view_type,
action=action.id,
))
view_id = False
return view_id, view_type
def get_default_domain(model, action, context, eval_context):
for ir_filter in model.env['ir.filters'].get_filters(model._name, action._origin.id):
if ir_filter['is_default']:
# user filters, static parsing only
default_domain = ast.literal_eval(ir_filter['domain'])
break
else:
def filters_from_context():
view_tree = None
for key, value in context.items():
if key.startswith('search_default_') and value:
filter_name = key[15:]
if not regex_object_name.match(filter_name):
raise ValueError(model.env._("Invalid default search filter name for %s", key))
if view_tree is None:
view = model.get_view(action.search_view_id.id, 'search')
view_tree = etree.fromstring(view['arch'])
if (element := view_tree.find(Rf'.//filter[@name="{filter_name}"]')) is not None:
# parse the domain
if domain := element.attrib.get('domain'):
yield domain
# not parsing context['group_by']
default_domain = expression.AND(
safe_eval(domain, eval_context)
for domain in filters_from_context()
)
return default_domain
def get_date_domain(start_date, end_date, view_tree):
if not start_date or not end_date:
start_date = date.today() + relativedelta(day=1)
end_date = start_date + relativedelta(months=1)
date_field = view_tree.attrib.get('date_start')
if not date_field:
raise ValueError("Could not find the date field in the view")
return [(date_field, '>=', start_date), (date_field, '<', end_date)]
def get_groupby(view_tree, groupby=None, fields=None):
"""Parse the given groupby and fields and fallback to the view if not provided.
Return the groupby as a list when given.
Otherwise find groupby and fields from the view.
:param view_tree: The xml tree of the view
:param groupby: string or None
:param fields: string or None
"""
if groupby:
groupby = groupby.split(',')
if fields:
fields = fields.split(',')
else:
fields = None
if groupby is not None:
return groupby, fields
if view_tree.tag in ('pivot', 'graph'):
# extract groupby from the view if we don't have any
field_by_type = defaultdict(list)
for element in view_tree.findall(r'./field'):
field_name = element.attrib.get('name')
if element.attrib.get('invisible', '') in ('1', 'true'):
field_by_type['invisible'].append(field_name)
else:
field_by_type[element.attrib.get('type', 'normal')].append(field_name)
# not reading interval from the attribute
groupby = [
*field_by_type.get('row', ()),
*field_by_type.get('col', ()),
*field_by_type.get('normal', ()),
]
if fields is None:
fields = field_by_type.get('measure', [])
return groupby, fields
if view_tree.attrib.get('default_group_by'):
# in case the kanban view (or other) defines a default grouping
# return the field name so it is added to the spec
field = view_tree.attrib.get('default_group_by')
return (None, [field] if field else [])
return None, None