353 lines
14 KiB
Python
353 lines
14 KiB
Python
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
import html
|
|
from base64 import b64encode
|
|
|
|
from datetime import date
|
|
|
|
from odoo.api import Environment
|
|
from odoo.fields import Command
|
|
from odoo.tests import tagged
|
|
from odoo.tools import file_open, mute_logger
|
|
|
|
from .test_common import TestHttpBase
|
|
|
|
CT_HTML = 'text/html; charset=utf-8'
|
|
CSRF_USER_HEADERS = {
|
|
"Sec-Fetch-Dest": "document",
|
|
"Sec-Fetch-Mode": "navigate",
|
|
"Sec-Fetch-Site": 'none',
|
|
"Sec-Fetch-User": "?1",
|
|
}
|
|
|
|
|
|
def read_group_list(model, domain=None, groupby=(), fields=('__count',)):
|
|
result = model.web_read_group(domain or [], groupby=groupby, fields=fields, lazy=False)
|
|
# transform result:
|
|
# - tuple into list
|
|
# - pop '__domain'
|
|
for group in result['groups']:
|
|
del group['__domain']
|
|
for k, v in group.items():
|
|
if isinstance(v, tuple):
|
|
group[k] = list(v)
|
|
return result
|
|
|
|
|
|
@tagged('-at_install', 'post_install')
|
|
class TestHttpWebJson_1(TestHttpBase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
# enable explicitely and make sure demo has permissions
|
|
cls.env['ir.config_parameter'].set_param('web.json.enabled', True)
|
|
cls.user_demo.write({
|
|
'groups_id': [Command.link(cls.env.ref('base.group_allow_export').id)],
|
|
})
|
|
|
|
cls.milky_way = cls.env.ref('test_http.milky_way')
|
|
cls.earth = cls.env.ref('test_http.earth')
|
|
with file_open('test_http/static/src/img/gizeh.png', 'rb') as file:
|
|
cls.gizeh_data = file.read()
|
|
cls.gizeh_b64 = b64encode(cls.gizeh_data).decode()
|
|
|
|
def url_open_json(self, url, *, expected_code=0):
|
|
url = f"/json/1{url}"
|
|
res = self.url_open(url, headers=CSRF_USER_HEADERS)
|
|
if expected_code is None:
|
|
pass
|
|
elif expected_code:
|
|
self.assertEqual(res.status_code, expected_code)
|
|
else:
|
|
# expected=0, raise for status
|
|
res.raise_for_status()
|
|
return res
|
|
|
|
def authenticate_demo(self):
|
|
self.authenticate('demo', 'demo')
|
|
user = self.user_demo
|
|
return Environment(self.env.cr, user.id, {'lang': user.lang, 'tz': user.tz})
|
|
|
|
def test_webjson_access_error(self):
|
|
self.authenticate_demo()
|
|
with self.assertLogs('odoo.http', 'WARNING') as capture:
|
|
res = self.url_open_json('/settings', expected_code=403)
|
|
self.assertEqual(res.headers['Content-Type'], 'text/html; charset=utf-8')
|
|
self.assertIn("You are not allowed to access", res.text)
|
|
self.assertEqual(len(capture.output), 1)
|
|
self.assertIn("You are not allowed to access", capture.output[0])
|
|
|
|
def test_webjson_access_error_crm(self):
|
|
action_crm = self.env['ir.actions.server'].sudo().search([('path', '=', 'crm')])
|
|
if not action_crm:
|
|
self.skipTest("crm is not installed")
|
|
|
|
self.authenticate_demo()
|
|
self.user_demo.groups_id += self.env.ref('sales_team.group_sale_salesman')
|
|
self.url_open_json('/crm')
|
|
|
|
self.env['ir.model.access'].search([
|
|
('model_id', '=', action_crm.model_id.id)
|
|
]).perm_read = False
|
|
|
|
with self.assertLogs('odoo.http', 'WARNING') as capture:
|
|
res = self.url_open_json('/crm', expected_code=403)
|
|
self.assertEqual(res.headers['Content-Type'], 'text/html; charset=utf-8')
|
|
self.assertIn("You are not allowed to access", res.text)
|
|
self.assertEqual(len(capture.output), 1)
|
|
self.assertIn("You are not allowed to access", capture.output[0])
|
|
|
|
def test_webjson_access_export(self):
|
|
# a simple call
|
|
url = f'/test_http.stargate/{self.earth.id}'
|
|
self.authenticate_demo()
|
|
res = self.url_open_json(url)
|
|
|
|
# remove export permssion
|
|
group_export = self.env.ref('base.group_allow_export')
|
|
self.user_demo.write({'groups_id': [Command.unlink(group_export.id)]})
|
|
|
|
# check that demo has no access to /json
|
|
with self.assertLogs('odoo.http', 'WARNING') as capture:
|
|
res = self.url_open_json(url, expected_code=403)
|
|
self.assertIn("need export permissions", res.text)
|
|
self.assertIn("need export permissions", capture.output[0])
|
|
|
|
def test_webjson_bad_stuff(self):
|
|
self.authenticate_demo()
|
|
|
|
with self.subTest(bad='action'):
|
|
res = self.url_open_json('/idontexist', expected_code=400)
|
|
self.assertEqual(res.headers['Content-Type'], CT_HTML)
|
|
self.assertIn(
|
|
"expected action at word 1 but found “idontexist”", res.text)
|
|
|
|
with self.subTest(bad='active_id'):
|
|
res = self.url_open_json('/5/test_http.stargate', expected_code=400)
|
|
self.assertEqual(res.headers['Content-Type'], CT_HTML)
|
|
self.assertIn("expected action at word 1 but found “5”", res.text)
|
|
|
|
with self.subTest(bad='record_id'):
|
|
res = self.url_open_json('/test_http.stargate/1/2', expected_code=400)
|
|
self.assertEqual(res.headers['Content-Type'], CT_HTML)
|
|
self.assertIn("expected action at word 3 but found “2”", res.text)
|
|
|
|
with self.subTest(bad='view_type'):
|
|
error = "Invalid view type 'idontexist'"
|
|
res = self.url_open_json('/res.users?view_type=idontexist', expected_code=400)
|
|
self.assertEqual(res.headers['Content-Type'], CT_HTML)
|
|
self.assertIn(error, html.unescape(res.text))
|
|
|
|
def test_webjson_form(self):
|
|
self.authenticate_demo()
|
|
res = self.url_open_json(f'/test_http.stargate/{self.earth.id}')
|
|
self.assertEqual(res.json(), {
|
|
'id': self.earth.id,
|
|
'name': self.earth.name,
|
|
'sgc_designation': self.earth.sgc_designation,
|
|
'galaxy_id': {'id': self.earth.galaxy_id.id,
|
|
'display_name': self.earth.galaxy_id.name},
|
|
'glyph_attach': self.gizeh_b64,
|
|
'glyph_inline': self.gizeh_b64,
|
|
})
|
|
|
|
def test_webjson_form_subtree(self):
|
|
env = self.authenticate_demo()
|
|
res = self.url_open_json(f'/test_http.galaxy/{self.milky_way.id}')
|
|
self.assertEqual(
|
|
res.json(),
|
|
self.milky_way.with_env(env).web_read({
|
|
'name': {},
|
|
'stargate_ids': {'fields': {
|
|
'name': {},
|
|
'sgc_designation': {}
|
|
}},
|
|
})[0],
|
|
)
|
|
|
|
def test_webjson_form_viewtype_list(self):
|
|
self.authenticate_demo()
|
|
url = f'/test_http.stargate/{self.earth.id}'
|
|
res = self.url_open_json(f'{url}?view_type=list')
|
|
self.assertEqual(res.json(), {
|
|
'id': self.earth.id,
|
|
'name': self.earth.name,
|
|
'sgc_designation': self.earth.sgc_designation,
|
|
})
|
|
|
|
def test_webjson_list(self):
|
|
env = self.authenticate_demo()
|
|
res = self.url_open_json('/test_http.stargate')
|
|
self.assertEqual(
|
|
res.json(),
|
|
env['test_http.stargate']
|
|
.web_search_read([], {'name': {}, 'sgc_designation': {}})
|
|
)
|
|
|
|
def test_webjson_list_limit_offset(self):
|
|
env = self.authenticate_demo()
|
|
url = '/test_http.stargate'
|
|
stargates = (
|
|
env['test_http.stargate']
|
|
.web_search_read([], {'name': {}, 'sgc_designation': {}})
|
|
)['records']
|
|
|
|
res_limit = self.url_open_json(f'{url}?limit=1')
|
|
self.assertEqual(res_limit.json(), {
|
|
'length': len(stargates),
|
|
'records': stargates[:1]
|
|
})
|
|
|
|
res_offset = self.url_open_json(f'{url}?offset=1')
|
|
self.assertEqual(res_offset.json(), {
|
|
'length': len(stargates),
|
|
'records': stargates[1:]
|
|
})
|
|
|
|
res_limit_offset = self.url_open_json(f'{url}?limit=1&offset=1')
|
|
self.assertEqual(res_limit_offset.json(), {
|
|
'length': len(stargates),
|
|
'records': stargates[1:2]
|
|
})
|
|
|
|
def test_webjson_list_domain(self):
|
|
env = self.authenticate_demo()
|
|
domain = [("address", "like", "gs38")]
|
|
res = self.url_open_json(f'/test_http.stargate?domain={domain!r}')
|
|
self.assertEqual(
|
|
res.json(),
|
|
env['test_http.stargate']
|
|
.web_search_read(domain, {'name': {}, 'sgc_designation': {}})
|
|
)
|
|
|
|
def test_webjson_list_domain_default_filter(self):
|
|
action_domain = [('availability', '>', 0.95)]
|
|
self.env.ref('test_http.action_window_stargate').domain = action_domain
|
|
env = self.authenticate_demo()
|
|
|
|
res = self.url_open_json('/test_http.stargate')
|
|
self.assertEqual(
|
|
res.json()["length"],
|
|
env['test_http.stargate'].search_count(action_domain),
|
|
)
|
|
|
|
user_domain = [('address', 'ilike', 'a')]
|
|
env['ir.filters'].create({
|
|
'domain': user_domain,
|
|
'model_id': 'test_http.stargate',
|
|
'name': 'Some def filter',
|
|
'is_default': True,
|
|
})
|
|
res = self.url_open_json('/test_http.stargate')
|
|
self.assertEqual(
|
|
res.json()["length"],
|
|
env['test_http.stargate'].search_count(action_domain + user_domain),
|
|
)
|
|
self.assertIn("ilike", res.url, "URL should contain a user domain")
|
|
self.assertNotIn(action_domain[0][0], res.url, "URL should not contain a domain with action domain")
|
|
|
|
def test_webjson_list_args(self):
|
|
env = self.authenticate_demo()
|
|
# create a default filter
|
|
domain = [("name", "ilike", "earth")]
|
|
self.env['ir.filters'].create({
|
|
'name': 'my filter',
|
|
'is_default': True,
|
|
'domain': domain,
|
|
'model_id': 'test_http.stargate',
|
|
})
|
|
res = self.url_open_json('/test_http.stargate')
|
|
self.assertEqual(
|
|
res.json(),
|
|
env['test_http.stargate']
|
|
.web_search_read(domain, {'name': {}, 'sgc_designation': {}})
|
|
)
|
|
# we should find one redirect with the domain in the URL
|
|
[hist] = res.history
|
|
self.assertEqual(hist.status_code, 307)
|
|
str_domain = str(domain).replace(' ', '+')
|
|
self.assertIn("limit=80", res.url)
|
|
self.assertIn(f"domain={str_domain}", res.url)
|
|
|
|
def test_webjson_pivot(self):
|
|
env = self.authenticate_demo()
|
|
res = self.url_open_json('/test_http.stargate?view_type=pivot')
|
|
self.assertEqual(
|
|
res.json(),
|
|
read_group_list(
|
|
env['test_http.stargate'], [], ['galaxy_id', 'has_galaxy_crystal'], ['availability']),
|
|
)
|
|
|
|
res = self.url_open_json('/test_http.stargate?view_type=pivot&groupby=has_galaxy_crystal&fields=availability:min')
|
|
self.assertEqual(
|
|
res.json(),
|
|
read_group_list(env['test_http.stargate'], [], ['has_galaxy_crystal'], ['availability:min']),
|
|
)
|
|
|
|
user_domain = [('availability', '>=', 0.95)]
|
|
env['ir.filters'].create({
|
|
'domain': user_domain,
|
|
'model_id': 'test_http.stargate',
|
|
'name': 'Some def filter',
|
|
'is_default': True,
|
|
})
|
|
res = self.url_open_json('/test_http.stargate?view_type=pivot&groupby=has_galaxy_crystal&fields=availability:min')
|
|
self.assertEqual(
|
|
res.json(),
|
|
read_group_list(env['test_http.stargate'], user_domain, ['has_galaxy_crystal'], ['availability:min']),
|
|
)
|
|
|
|
def test_webjson_graph(self):
|
|
env = self.authenticate_demo()
|
|
res = self.url_open_json('/test_http.stargate?view_type=graph')
|
|
self.assertEqual(
|
|
res.json(),
|
|
read_group_list(env['test_http.stargate'], [], ['galaxy_id']),
|
|
)
|
|
|
|
def test_webjson_activity(self):
|
|
env = self.authenticate_demo()
|
|
env['test_http.stargate'].search([], limit=1).activity_schedule(summary='test')
|
|
res = self.url_open_json('/test_http.stargate?view_type=activity')
|
|
# check that we have at least the following fields
|
|
expected_fields = ["activity_ids", "activity_summary", "activity_user_id", "galaxy_id"]
|
|
self.assertEqual(
|
|
sorted(field_name for field_name in res.json()["records"][0] if field_name in expected_fields),
|
|
expected_fields,
|
|
)
|
|
|
|
def test_webjson_calendar(self):
|
|
env = self.authenticate_demo()
|
|
today = date.today()
|
|
start_date_iso = today.replace(day=1).isoformat()
|
|
# check that we have the date in the URL
|
|
res = self.url_open_json('/test_http.stargate?view_type=calendar')
|
|
self.assertIn(f"start_date={start_date_iso}", res.url)
|
|
# check that we can filter using the date
|
|
last_date = max(
|
|
env['test_http.stargate'].search([('last_use_date', '!=', False)])
|
|
.mapped('last_use_date')
|
|
)
|
|
res = self.url_open_json(f'/test_http.stargate?view_type=calendar&domain=[]&start_date={last_date.isoformat()}&end_date=2099-01-01')
|
|
self.assertEqual(
|
|
res.json()["length"],
|
|
env['test_http.stargate'].search_count([('last_use_date', '>=', last_date)]),
|
|
)
|
|
|
|
def test_webjson_readonly(self):
|
|
env = self.authenticate_demo()
|
|
# test that we can write
|
|
env.ref('test_http.earth').copy()
|
|
# create the action that executes the same write
|
|
self.env['ir.actions.server'].create({
|
|
'name': 'test write',
|
|
'model_id': self.env['ir.model']._get('test_http.stargate').id,
|
|
'path': 'test_webjson_readonly',
|
|
'state': 'code',
|
|
'code': "action = {}\nmodel.env.ref('test_http.earth').copy()",
|
|
})
|
|
# test that is does NOT work
|
|
with mute_logger("odoo.http", "odoo.sql_db"):
|
|
res = self.url_open_json('/test_webjson_readonly', expected_code=403)
|
|
self.assertIn("Unsupported server action", res.text)
|