Odoo18-Base/odoo/addons/test_new_api/tests/test_properties.py

2981 lines
121 KiB
Python
Raw Permalink Normal View History

2025-01-06 10:57:38 +07:00
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import babel.dates
import datetime
import json
import unittest
from unittest.mock import patch
from odoo import Command
from odoo.exceptions import AccessError, UserError
from odoo.osv import expression
from odoo.tests import Form, TransactionCase, users
from odoo.tools import mute_logger, get_lang
class TestPropertiesMixin(TransactionCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.user = cls.env.user
cls.partner = cls.env['test_new_api.partner'].create({'name': 'Test Partner Properties'})
cls.partner_2 = cls.env['test_new_api.partner'].create({'name': 'Test Partner Properties 2'})
cls.test_user = cls.env['res.users'].create({
'name': 'Test',
'login': 'test',
'company_id': cls.env.company.id,
})
attributes_definition_1 = [{
'name': 'discussion_color_code',
'string': 'Color Code',
'type': 'char',
'default': 'blue',
}, {
'name': 'moderator_partner_id',
'string': 'Partner',
'type': 'many2one',
'comodel': 'test_new_api.partner',
}]
attributes_definition_2 = [{
'name': 'state',
'type': 'selection',
'string': 'Status',
'selection': [('draft', 'Draft'), ('progress', 'In Progress'), ('done', 'Done')],
'default': 'draft',
}]
cls.discussion_1 = cls.env['test_new_api.discussion'].create({
'name': 'Test Discussion',
'attributes_definition': attributes_definition_1,
'participants': [Command.link(cls.user.id)],
})
cls.discussion_2 = cls.env['test_new_api.discussion'].create({
'name': 'Test Discussion',
'attributes_definition': attributes_definition_2,
'participants': [Command.link(cls.user.id)],
})
cls.message_1 = cls.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': cls.discussion_1.id,
'author': cls.user.id,
'attributes': {
'discussion_color_code': 'Test',
'moderator_partner_id': cls.partner.id,
},
})
cls.message_2 = cls.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': cls.discussion_1.id,
'author': cls.user.id,
})
cls.message_3 = cls.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': cls.discussion_2.id,
'author': cls.user.id,
})
def _get_sql_properties(self, message):
self.env.flush_all()
self.env.cr.execute(
"""
SELECT attributes
FROM test_new_api_message
WHERE id = %s
""", (message.id, ),
)
value = self.env.cr.fetchone()
self.assertTrue(value)
return value[0]
def _get_sql_definition(self, discussion):
self.env.flush_all()
self.env.cr.execute(
"""
SELECT attributes_definition
FROM test_new_api_discussion
WHERE id = %s
""", (discussion.id, ),
)
value = self.env.cr.fetchone()
self.assertTrue(value and value[0])
return value[0]
class PropertiesCase(TestPropertiesMixin):
def test_properties_field(self):
self.assertTrue(isinstance(self.message_1.attributes, dict))
# testing assigned value
self.assertEqual(self.message_1.attributes, {
'discussion_color_code': 'Test',
'moderator_partner_id': self.partner.id,
})
self.assertEqual(self.message_2.attributes, {
'discussion_color_code': 'blue',
'moderator_partner_id': False,
})
# testing default value
self.assertEqual(
self.message_3.attributes, {'state': 'draft'},
msg='Should have taken the default value')
self.message_1.attributes = [
{'name': 'discussion_color_code', 'value': 'red'},
{'name': 'moderator_partner_id', 'value': self.partner_2.id},
]
self.assertEqual(self.message_1.attributes, {
'discussion_color_code': 'red',
'moderator_partner_id': self.partner_2.id,
})
self.env.invalidate_all()
self.assertEqual(self.message_1.attributes, {
'discussion_color_code': 'red',
'moderator_partner_id': self.partner_2.id,
})
# check that the value has been updated in the database
database_values = self._get_sql_properties(self.message_1)
self.assertTrue(isinstance(database_values, dict))
self.assertEqual(
database_values.get('discussion_color_code'), 'red',
msg='Value must be updated in the database')
# if we write False on the field, it should still
# return the properties definition for the web client
self.message_3.attributes = False
self.env.invalidate_all()
expected = self.discussion_2.attributes_definition
for property_definition in expected:
property_definition['value'] = False
self.assertEqual(self.message_3.read(['attributes'])[0]['attributes'], expected)
self.assertEqual(self.message_3.attributes, {
definition['name']: definition['value']
for definition in expected
})
def test_properties_field_parameters_cleanup(self):
# check that the keys not valid for the given type are removed
self.message_1.attributes = [{
'name': 'discussion_color_code',
'string': 'Color Code',
'type': 'char',
'default': 'blue',
'value': 'Test',
'definition_changed': True,
'selection': [['a', 'A']], # selection key is not valid for char type
}]
values = self._get_sql_definition(self.message_1.discussion)
self.assertEqual(values, [{
'name': 'discussion_color_code',
'string': 'Color Code',
'type': 'char',
'default': 'blue',
}])
def test_properties_field_injection(self):
for c in '!#"\'- |+/\\':
with self.assertRaises(ValueError):
self.message_1.attributes = [{
'name': f'discussion_color_code{c}',
'type': 'char',
'definition_changed': True
}]
with self.assertRaises(ValueError):
self.discussion_1.attributes_definition = [{
'name': f'discussion_color_code{c}',
'type': 'char',
}]
with self.assertRaises(ValueError):
self.message_1.attributes = [{
'name': 'a' * 513,
'type': 'char',
'definition_changed': True
}]
with self.assertRaises(ValueError):
self.discussion_1.attributes_definition = [{
'name': 'a' * 513,
'type': 'char',
}]
@mute_logger('odoo.fields')
def test_properties_field_write_batch(self):
"""Test the behavior of the write called in batch.
Simulate a write operation done by the web client.
"""
# mix both properties
properties_values = (self.message_1 | self.message_3).read(['attributes'])
properties_values = properties_values[0]['attributes'] + properties_values[1]['attributes']
for properties in properties_values:
if properties['name'] == 'discussion_color_code':
properties['value'] = 'orange'
elif properties['name'] == 'state':
properties['value'] = 'done'
elif properties['name'] == 'moderator_partner_id':
properties['value'] = self.partner_2.id
properties['definition_changed'] = True
(self.message_1 | self.message_3).write({'attributes': properties_values})
sql_values_1 = self._get_sql_properties(self.message_1)
sql_values_3 = self._get_sql_properties(self.message_3)
# definition of both child has been changed
self.assertEqual(sql_values_1, {'discussion_color_code': 'orange', 'moderator_partner_id': self.partner_2.id, 'state': 'done'})
self.assertEqual(sql_values_3, {'discussion_color_code': 'orange', 'moderator_partner_id': self.partner_2.id, 'state': 'done'})
@mute_logger('odoo.models.unlink', 'odoo.fields')
def test_properties_field_read_batch(self):
values = self.message_1.read(['attributes'])[0]['attributes']
self.assertEqual(len(values), 2)
self.assertEqual(values[0]['type'], 'char')
self.assertEqual(values[1]['type'], 'many2one')
self.message_2.attributes = [{
'name': 'discussion_color_code',
'type': 'char',
'string': 'Color Code',
'default': 'blue',
'value': 'Test',
}, {
'name': 'moderator_partner_id',
'type': 'many2one',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'value': (self.partner_2.id, "Bob"),
}]
expected_queries = [
# read the properties field value
''' SELECT "test_new_api_message"."id",
"test_new_api_message"."attributes"
FROM "test_new_api_message"
WHERE ("test_new_api_message"."id" IN %s)
''',
''' SELECT "test_new_api_message"."id",
"test_new_api_message"."discussion",
"test_new_api_message"."body",
"test_new_api_message"."author",
"test_new_api_message"."name",
"test_new_api_message"."important",
"test_new_api_message"."label"->>%s,
"test_new_api_message"."priority",
"test_new_api_message"."active",
"test_new_api_message"."create_uid",
"test_new_api_message"."create_date",
"test_new_api_message"."write_uid",
"test_new_api_message"."write_date"
FROM "test_new_api_message"
WHERE ("test_new_api_message"."id" IN %s)
''',
# read the definition on the definition record
''' SELECT "test_new_api_discussion"."id",
"test_new_api_discussion"."name",
"test_new_api_discussion"."moderator",
"test_new_api_discussion"."message_concat",
"test_new_api_discussion"."history",
"test_new_api_discussion"."attributes_definition",
"test_new_api_discussion"."create_uid",
"test_new_api_discussion"."create_date",
"test_new_api_discussion"."write_uid",
"test_new_api_discussion"."write_date"
FROM "test_new_api_discussion"
WHERE ("test_new_api_discussion"."id" IN %s)
''',
# check the many2one existence
''' SELECT "test_new_api_partner"."id"
FROM "test_new_api_partner"
WHERE "test_new_api_partner"."id" IN %s
''',
''' SELECT "test_new_api_partner"."id",
"test_new_api_partner"."name",
"test_new_api_partner"."create_uid",
"test_new_api_partner"."create_date",
"test_new_api_partner"."write_uid",
"test_new_api_partner"."write_date"
FROM "test_new_api_partner"
WHERE ("test_new_api_partner"."id" IN %s)
''',
]
self.env.invalidate_all()
with self.assertQueryCount(5), self.assertQueries(expected_queries):
self.message_1.read(['attributes'])
# read in batch a lot of records
discussions = [self.discussion_1, self.discussion_2]
partners = self.env['test_new_api.partner'].create([{'name': f'Test {i}'} for i in range(50)])
messages = self.env['test_new_api.message'].create([{
'name': f'Test Message {i}',
'discussion': discussions[i % 2].id,
'author': self.user.id,
'attributes': [{
'name': 'partner_id',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'value': partner.id,
'definition_changed': True,
}]
} for i, partner in enumerate(partners)])
self.env.invalidate_all()
with self.assertQueryCount(5), self.assertQueries(expected_queries):
values = messages.read(['attributes'])
# remove some partners in the list
partners[:20].unlink()
self.env.invalidate_all()
# 5 queries instead of 25 queries, thanks to the cache values that has been
# cleaned (the properties field can trust the cached value, the deleted ids
# are not in the cache even if they still exists in the database)
with self.assertQueryCount(5):
values = messages.read(['attributes'])
@mute_logger('odoo.fields')
def test_properties_field_delete(self):
"""Test to delete a property using the flag "definition_deleted"."""
self.message_1.attributes = [{
'name': 'discussion_color_code',
'string': 'Test color code',
'type': 'char',
'default': 'blue',
'value': 'purple',
}, {
'name': 'moderator_partner_id',
'string': 'Partner',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'value': [self.partner.id, 'Bob'],
'definition_deleted': True,
}]
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(
sql_definition, [{
'name': 'discussion_color_code',
'type': 'char',
'string': 'Test color code',
'default': 'blue',
}])
self.assertEqual(len(self.message_1.attributes), 1)
self.assertEqual(self.message_1.attributes, {'discussion_color_code': 'purple'})
@mute_logger('odoo.fields')
def test_properties_field_create_batch(self):
# first create to cache the access rights
self.env['test_new_api.message'].create({'name': 'test'})
with self.assertQueryCount(2):
messages = self.env['test_new_api.message'].create([{
'name': 'Test Message',
'discussion': False,
'author': self.user.id,
}, {
'name': 'Test Message',
'discussion': False,
'author': self.user.id,
}])
self.env.invalidate_all()
with self.assertQueryCount(7):
messages = self.env['test_new_api.message'].create([{
'name': 'Test Message',
'discussion': self.discussion_1.id,
'author': self.user.id,
'attributes': [{
# no name, should be automatically generated
'string': 'Discussion Color code',
'type': 'char',
'default': 'blue',
'value': 'purple',
'definition_changed': True,
}, {
# the name is already set and shouldn't be re-generated
'name': 'moderator_partner_id',
'string': 'Partner',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'value': self.partner.id,
'definition_changed': True,
}],
}, {
'name': 'Test Message',
'discussion': self.discussion_2.id,
'author': self.user.id,
'attributes': [{
'type': 'selection',
'string': 'Status',
'selection': [
('draft', 'Draft'),
('progress', 'In Progress'),
('done', 'Done'),
],
'default': 'draft',
'definition_changed': True,
}],
}])
self.env.invalidate_all()
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(len(sql_definition), 2)
# check the generated name
property_color_name = sql_definition[0]['name']
self.assertTrue(property_color_name, msg="Property name must have been generated")
self.assertEqual(sql_definition, [
{
'name': property_color_name,
'default': 'blue',
'string': 'Discussion Color code',
'type': 'char',
}, {
'name': 'moderator_partner_id',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'string': 'Partner',
}
])
self.assertEqual(
self.discussion_1.attributes_definition[0]['string'], 'Discussion Color code',
msg='Should have updated the definition record')
self.assertEqual(len(messages), 2)
sql_properties_1 = self._get_sql_properties(messages[0])
self.assertEqual(
sql_properties_1,
{'moderator_partner_id': self.partner.id,
property_color_name: 'purple'})
sql_properties_2 = self._get_sql_properties(messages[1])
status_name = self.discussion_2.attributes_definition[0]['name']
self.assertEqual(
sql_properties_2,
{status_name: 'draft'})
properties_values_1 = messages[0].attributes
properties_values_2 = messages[1].attributes
self.assertEqual(len(properties_values_1), 2, msg='Discussion 1 has 2 properties')
self.assertEqual(len(properties_values_2), 1, msg='Discussion 2 has 1 property')
self.assertEqual(properties_values_1, {
'moderator_partner_id': self.partner.id,
property_color_name: 'purple',
})
self.assertEqual(properties_values_2, {status_name: 'draft'},
msg='Should have taken the default value')
def test_properties_field_default(self):
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': self.discussion_2.id,
'author': self.user.id,
})
self.assertEqual(
message.attributes,
{'state': 'draft'},
msg='Should have taken the default value')
message.attributes = [{'name': 'state', 'value': None}]
self.assertEqual(
message.attributes,
{'state': False},
msg='Writing None should not reset to the default value')
# test the case where the definition record come from a default as well
def default_discussion(_record):
return self.discussion_2.id
with patch.object(self.env['test_new_api.message']._fields['discussion'], 'default', default_discussion):
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'author': self.user.id,
})
self.assertEqual(message.discussion, self.discussion_2)
self.assertEqual(
message.attributes,
{'state': 'draft'},
msg='Should have taken the default value')
# the definition record come from a default value
self.discussion_2.attributes_definition = [{
'name': 'test',
'type': 'char',
'default': 'default char',
}]
message = self.env['test_new_api.message'] \
.with_context(default_discussion=self.discussion_2) \
.create({'name': 'Test Message', 'author': self.user.id})
self.assertEqual(message.discussion, self.discussion_2)
self.assertEqual(message.attributes, {'test': 'default char'})
# test a default many2one
self.discussion_1.attributes_definition = [
{
'name': 'my_many2one',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'type': 'many2one',
# send the value like the web client does
'default': [self.partner.id, 'Bob'],
},
]
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(sql_definition[0]['default'], self.partner.id)
read_values = self.discussion_1.read(['attributes_definition'])[0]['attributes_definition']
self.assertEqual(
read_values[0]['default'],
(self.partner.id, self.partner.display_name),
msg='When reading many2one default, it should return the display name',
)
# read the default many2one and deactivate the display_name
read_values = self.discussion_1.read(['attributes_definition'], load=None)[0]['attributes_definition']
self.assertEqual(
read_values[0]['default'],
self.partner.id,
msg='If the display_name is deactivate, it should not return the display name',
)
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'author': self.user.id,
'discussion': self.discussion_1.id,
})
properties = message.read(['attributes'])[0]['attributes']
self.assertEqual(properties[0]['value'], (self.partner.id, self.partner.display_name))
self.assertEqual(message.attributes, {'my_many2one': self.partner.id})
# give a default value and a value for a many2one
# the default value must be ignored
property_definition = self.discussion_1.read(['attributes_definition'])[0]['attributes_definition']
property_definition[0]['value'] = (self.partner_2.id, 'Alice')
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'author': self.user.id,
'discussion': self.discussion_1.id,
'attributes': property_definition,
})
self.assertEqual(
message.attributes,
{'my_many2one': self.partner_2.id},
msg='Should not take the default value',
)
# default value but no parent are set
record = self.env['test_new_api.message'].create({
'attributes': {'my_many2one': self.partner_2.id},
})
self.assertFalse(self._get_sql_properties(record))
# default value but the parent has no definition
self.discussion_1.attributes_definition = []
record = self.env['test_new_api.message'].create({
'discussion': self.discussion_1.id,
'attributes': {'my_many2one': self.partner_2.id},
})
self.assertFalse(self._get_sql_properties(record))
# default value but the parent has no definition and we create a new property
self.discussion_1.attributes_definition = []
record = self.env['test_new_api.message'].create({
'discussion': self.discussion_1.id,
'attributes': [{
'name': 'test',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'default': self.partner_2.id,
'definition_changed': True,
}],
})
self.assertEqual(self._get_sql_properties(record), {'test': self.partner_2.id})
# default value, a parent is set and change the definition
record = self.env['test_new_api.message'].create({
'discussion': self.discussion_1.id,
'attributes': [{
'name': 'test',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'default': self.partner_2.id,
}, {
'name': 'my_char',
'type': 'char',
'default': 'my char',
'definition_changed': True,
}],
})
self.assertEqual(self._get_sql_properties(record), {'my_char': 'my char', 'test': self.partner_2.id})
# use the context to set the default value, the default key in the definition is ignored
# (e.g. when you create a new record in a Kanban view grouped by a property)
del property_definition[0]['value']
self.discussion_1.attributes_definition = property_definition
partner = self.env['test_new_api.partner'].create({'name': 'Test Default'})
message = self.env['test_new_api.message'] \
.with_context({'default_attributes.my_many2one': partner.id}) \
.create({
'name': 'Test Message',
'author': self.user.id,
'discussion': self.discussion_1.id,
'attributes': property_definition,
})
sql_values = self._get_sql_properties(message)
self.assertEqual(sql_values, {'my_many2one': partner.id})
properties = message.read(['attributes'])[0]['attributes']
self.assertEqual(properties[0]['value'], (partner.id, partner.display_name))
# "None" is a valid default value
del property_definition[0]['value']
message = self.env['test_new_api.message'] \
.with_context({'default_attributes.my_many2one': None}) \
.create({
'name': 'Test Message',
'author': self.user.id,
'discussion': self.discussion_1.id,
'attributes': property_definition,
})
sql_values = self._get_sql_properties(message)
self.assertEqual(sql_values, {'my_many2one': False})
properties = message.read(['attributes'])[0]['attributes']
self.assertEqual(properties[0]['value'], False)
def test_properties_field_read(self):
"""Test the behavior of the read method.
In comparison with a simple "record.properties", the read method should not
record a recordset for the many2one, but a tuple with the record id and
the record display_name.
"""
properties_values = (self.message_1 | self.message_3).read(['attributes'])
self.assertEqual(len(properties_values), 2)
properties_message_1 = properties_values[0]['attributes']
properties_message_3 = properties_values[1]['attributes']
self.assertTrue(isinstance(properties_message_1, list))
self.assertTrue(isinstance(properties_message_3, list))
self.assertEqual(len(properties_message_1), 2, msg="Message 1 has 2 properties")
self.assertEqual(len(properties_message_3), 1, msg="Message 3 has 1 property")
self.assertEqual(
properties_message_1[0]['name'], 'discussion_color_code',
msg='First message 1 property should be "discussion_color_code"')
self.assertEqual(
properties_message_1[1]['name'], 'moderator_partner_id',
msg='Second message 1 property should be "moderator_partner_id"')
self.assertEqual(
properties_message_3[0]['name'], 'state',
msg='First message 3 property should be "state"')
many2one_property = properties_message_1[1]
self.assertEqual(
many2one_property['string'], 'Partner',
msg='Definition must be present when reading child')
self.assertEqual(
many2one_property['type'], 'many2one',
msg='Definition must be present when reading child')
self.assertEqual(
many2one_property['comodel'], 'test_new_api.partner',
msg='Definition must be present when reading child')
self.assertEqual(many2one_property['value'], (self.partner.id, self.partner.display_name))
def test_properties_field_many2one_basic(self):
"""Test the basic (read, write...) of the many2one property."""
self.message_2.attributes = [
{
"name": "discussion_color_code",
"type": "char",
"string": "Color Code",
"default": "blue",
"value": False,
}, {
"name": "moderator_partner_id",
"type": "many2one",
"string": "Partner",
"comodel": "test_new_api.partner",
"value": self.partner_2.id,
},
]
self.assertFalse(self.message_2.attributes['discussion_color_code'])
self.assertEqual(self.message_2.attributes['moderator_partner_id'], self.partner_2.id)
sql_values = self._get_sql_properties(self.message_2)
self.assertEqual(
sql_values,
{'moderator_partner_id': self.partner_2.id,
'discussion_color_code': False})
# read the many2one
properties = self.message_2.read(['attributes'])[0]['attributes']
self.assertEqual(properties[1]['value'], (self.partner_2.id, self.partner_2.display_name))
self.assertEqual(properties[1]['comodel'], 'test_new_api.partner')
# should not be able to set a transient model
with self.assertRaises(ValueError):
self.message_2.attributes = [{
"name": "moderator_partner_id",
"type": "many2one",
"comodel": "test_new_api.transient_model",
"definition_changed": True,
}]
with self.assertRaises(ValueError):
self.discussion_1.attributes_definition = [{
"name": "moderator_partner_id",
"type": "many2one",
"comodel": "test_new_api.transient_model",
}]
@mute_logger('odoo.models.unlink', 'odoo.fields')
def test_properties_field_many2one_unlink(self):
"""Test the case where we unlink the many2one record."""
self.message_2.attributes = [{
'name': 'moderator_partner_id',
'value': self.partner.id,
}]
# remove the partner on message 2
self.partner.unlink()
with self.assertQueryCount(4):
# 1 query to read the field
# 1 query to read the definition
# 2 queries to check if the many2one still exists / display_name
self.assertFalse(self.message_2.read(['attributes'])[0]['attributes'][0]['value'])
# remove the partner, and use the read method
self.message_2.attributes = [{
'name': 'moderator_partner_id',
'value': self.partner_2.id,
}]
self.partner_2.unlink()
with self.assertQueryCount(4):
value = self.message_2.read(['attributes'])
value = value[0]['attributes']
self.assertFalse(value[1]['value'])
self.assertEqual(value[1]['comodel'], 'test_new_api.partner')
# many2one properties in a default value
partner = self.env['res.partner'].create({'name': 'test unlink'})
self.message_2.attributes = [{
'name': 'moderator_partner_id',
'type': 'many2one',
'comodel': 'res.partner',
'default': [partner.id, 'Bob'],
'definition_changed': True,
}]
self.assertEqual(
self.message_2.read(['attributes'])[0]['attributes'],
[{
'name': 'moderator_partner_id',
'type': 'many2one',
'comodel': 'res.partner',
'default': (partner.id, partner.display_name),
'value': False,
}],
)
partner.unlink()
self.assertEqual(
self.message_2.read(['attributes'])[0]['attributes'],
[{
'name': 'moderator_partner_id',
'type': 'many2one',
'comodel': 'res.partner',
'default': False,
'value': False,
}],
)
def test_properties_field_many2one_model_removed(self):
"""Test the case where we uninstall a module, and the model does not exist anymore."""
# simulate a module uninstall, the model is not available now
# when reading the model / many2one, it should return False
self.message_1.attributes = [{
'name': 'message',
'value': self.message_3.id,
}]
self.env.flush_all()
self.env.cr.execute(
"""
UPDATE test_new_api_discussion
SET attributes_definition = '[{"name": "message", "comodel": "wrong_model", "type": "many2one"}]'
WHERE id = %s
""", (self.discussion_1.id, ),
)
self.env.invalidate_all()
values = self.discussion_1.read(['attributes_definition'])[0]
self.assertFalse(values['attributes_definition'][0]['comodel'])
attributes_definition = self.discussion_1.attributes_definition
self.assertEqual(
attributes_definition,
[{'name': 'message', 'comodel': False, 'type': 'many2one'}],
msg='The model does not exist anymore, it should return false',
)
# read the many2one on the child, should return False as well
self.assertFalse(self.message_1.attributes.get('message'))
values = self.message_1.read(['attributes'])[0]['attributes']
self.assertEqual(values[0]['type'], 'many2one', msg='Property type should be preserved')
self.assertFalse(values[0]['value'])
self.assertFalse(values[0]['comodel'])
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(
sql_definition,
[{'name': 'message', 'comodel': 'wrong_model', 'type': 'many2one'}],
msg='Do not clean the definition until we write on the field'
)
# write on the properties definition must clean the wrong model name
self.discussion_1.attributes_definition = self.discussion_1.attributes_definition
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(
sql_definition,
[{'name': 'message', 'comodel': False, 'type': 'many2one'}],
msg='Should have cleaned the model key',
)
def test_properties_field_domain(self):
self.discussion_1.attributes_definition = [{
'name': 'message',
'comodel': 'test_new_api.message',
'type': 'many2one',
'domain': "[('name', 'ilike', 'message')]",
}]
domain = self.message_1.read(['attributes'])[0]['attributes'][0]['domain']
self.assertEqual(domain, "[('name', 'ilike', 'message')]")
# set a wrong domain, it can happen if we uninstall a module
# and if a field defined in this module was used in the domain
self.env.flush_all()
new_properties = json.dumps([{
'name': 'message',
'comodel': 'test_new_api.message',
'type': 'many2one',
'domain': "[('wrong_field', 'ilike', 'test')]",
}])
self.env.cr.execute(
"""
UPDATE test_new_api_discussion
SET attributes_definition = %s
WHERE id = %s
""", (new_properties, self.discussion_1.id, ),
)
self.env.flush_all()
self.env.invalidate_all()
definition = self.discussion_1.read(['attributes_definition'])[0]['attributes_definition']
self.assertNotIn('domain', definition)
properties = self.message_1.read(['attributes'])[0]['attributes']
self.assertNotIn('domain', properties)
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertIn(
'domain',
sql_definition[0],
msg='The domain should remain in database until we write on the properties definition',
)
def test_properties_field_integer_float_boolean(self):
self.discussion_1.attributes_definition = [
{
'name': 'int_value',
'string': 'Int Value',
'type': 'integer',
}, {
'name': 'float_value',
'string': 'Float Value',
'type': 'float',
}, {
'name': 'boolean_value',
'string': 'Boolean Value',
'type': 'boolean',
},
]
self.message_1.attributes = [{
'name': 'int_value',
'value': 55555555555,
}, {
'name': 'float_value',
'value': 1.337,
}, {
'name': 'boolean_value',
'value': 77777, # should be converted into True
}]
self.env.invalidate_all()
self.assertEqual(self.message_1.attributes, {
'int_value': 55555555555,
'float_value': 1.337,
'boolean_value': True,
})
self.message_1.attributes = [{'name': 'boolean_value', 'value': 0}]
self.assertEqual(
self.message_1.attributes['boolean_value'], False,
msg='Boolean value must have been converted to False')
# When the user sets the value 0 for the property fields of type integer
# and float, the system should store the value 0 and shouldn't transform
# 0 to False (-> unset value).
self.message_1.attributes = {'int_value': 0, 'float_value': 0}
self.assertEqual(self.message_1.attributes, {
'int_value': 0,
'float_value': 0,
'boolean_value': False,
})
self.assertTrue(isinstance(self.message_1.attributes['int_value'], int))
self.assertTrue(isinstance(self.message_1.attributes['float_value'], int))
self.assertTrue(isinstance(self.message_1.attributes['boolean_value'], bool))
self.assertEqual(self._get_sql_properties(self.message_1), {'int_value': 0, 'float_value': 0, 'boolean_value': False})
def test_properties_field_integer_float_falsy_value_edge_cases(self):
self.discussion_1.attributes_definition = [
{
'name': 'int_value',
'string': 'Int Value',
'type': 'integer',
'default': 42
}, {
'name': 'float_value',
'string': 'Float Value',
'type': 'float',
'default': 0.42
}
]
message_1 = self.env['test_new_api.message'].create({
'discussion': self.discussion_1.id,
'author': self.user.id,
'attributes': {'int_value': 0, 'float_value': 0}
})
# When the user sets the value 0 for the property fields of type integer
# and float, the system shouldn't consider 0 as a falsy value and fallback
# to the default value.
self.assertEqual(message_1.attributes, {
'int_value': 0,
'float_value': 0,
})
self.assertTrue(isinstance(message_1.attributes['int_value'], int))
self.assertTrue(isinstance(message_1.attributes['float_value'], int))
self.assertEqual(self._get_sql_properties(message_1), {'int_value': 0, 'float_value': 0})
def test_properties_field_selection(self):
self.message_3.attributes = [{'name': 'state', 'value': 'done'}]
self.env.invalidate_all()
self.assertEqual(self.message_3.attributes, {'state': 'done'})
# the option might have been removed on the definition, write False
self.message_3.attributes = [{'name': 'state', 'value': 'unknown_selection'}]
self.env.invalidate_all()
self.assertEqual(self.message_3.attributes, {'state': False})
with self.assertRaises(ValueError):
# check that 2 options can not have the same id
self.discussion_1.attributes_definition = [
{
'name': 'option',
'type': 'selection',
'selection': [['a', 'A'], ['b', 'B'], ['a', 'C']],
}
]
self.message_3.attributes = [{
'type': 'selection',
'name': 'new_selection',
'string': 'My Selection',
'definition_changed': True,
}]
values = self.message_3.read(['attributes'])[0]['attributes'][0]
self.assertEqual(values.get('name'), 'new_selection')
self.assertEqual(values.get('selection'), [], 'Selection key should be at least an empty array (never False)')
def test_properties_field_separator(self):
"""Test the separator properties."""
self.message_1.attributes = [
{'name': 'boolean_value', 'value': 0, 'type': 'boolean', 'definition_changed': True},
{'type': 'separator', 'name': 'separator', 'string': 'Group 1'},
{'name': 'int_value', 'value': 0, 'type': 'integer'},
]
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(
sql_definition,
[
{'name': 'boolean_value', 'type': 'boolean'},
{'name': 'separator', 'type': 'separator', 'string': 'Group 1'},
{'name': 'int_value', 'type': 'integer'},
],
)
sql_values = self._get_sql_properties(self.message_1)
self.assertEqual(
sql_values, {'int_value': False, 'boolean_value': False},
msg='Separator should never be stored on the children, only in the definition record')
def test_properties_field_tags(self):
"""Test the behavior of the tag property.
The tags properties is basically the same as the selection property,
but you can select multiple values. It should work like the selection
(if we remove a value on the definition record, it should remove the value on each
child the next time we read, etc).
Each tags has a color index defined on the definition record.
"""
self.discussion_1.attributes_definition = [
{
'name': 'my_tags',
'string': 'My Tags',
'type': 'tags',
'tags': [
('be', 'BE', 1),
('fr', 'FR', 2),
('de', 'DE', 3),
('it', 'IT', 1),
],
'default': ['be', 'de'],
},
]
message = self.env['test_new_api.message'].create(
{'discussion': self.discussion_1.id, 'author': self.user.id})
self.assertEqual(message.attributes, {'my_tags': ['be', 'de']})
self.assertEqual(self._get_sql_properties(message), {'my_tags': ['be', 'de']})
self.env.invalidate_all()
# remove the DE tags on the definition
self.discussion_1.attributes_definition = [
{
'name': 'my_tags',
'string': 'My Tags',
'type': 'tags',
'tags': [
('be', 'BE', 1),
('fr', 'FR', 2),
('it', 'IT', 1),
],
'default': ['be', 'de'],
},
]
# the value must remain in the database until the next write on the child
self.assertEqual(self._get_sql_properties(message), {'my_tags': ['be', 'de']})
attributes = message.read(['attributes'])[0]['attributes']
self.assertEqual(
attributes[0]['value'],
['be'],
msg='The tag has been removed on the definition, should be removed when reading the child')
self.assertEqual(
message.attributes,
{'my_tags': ['be', 'de']})
# next write on the child must update the value
message.attributes = message.read(['attributes'])[0]['attributes']
self.assertEqual(self._get_sql_properties(message), {'my_tags': ['be']})
with self.assertRaises(ValueError):
# it should detect that the tag is duplicated
self.discussion_1.attributes_definition = [
{
'name': 'my_tags',
'type': 'tags',
'tags': [
('be', 'BE', 1),
('be', 'FR', 2),
],
},
]
self.message_3.attributes = [{
'type': 'tags',
'name': 'new_tags',
'string': 'My tags',
'definition_changed': True,
}]
values = self.message_3.read(['attributes'])[0]['attributes'][0]
self.assertEqual(values.get('name'), 'new_tags')
self.assertEqual(values.get('tags'), [], 'Tags key should be at least an empty array (never False)')
@mute_logger('odoo.models.unlink', 'odoo.fields')
def test_properties_field_many2many_basic(self):
"""Test the basic operation on a many2many properties (read, write...).
Check also that if we remove some record,
those are filtered when we read the child.
"""
partners = self.env['test_new_api.partner'].create([
{'name': f'Partner {i}'}
for i in range(20)
])
self.discussion_1.attributes_definition = [{
'name': 'moderator_partner_ids',
'string': 'Partners',
'type': 'many2many',
'comodel': 'test_new_api.partner',
}]
def name_get(records):
return list(zip(records._ids, records.mapped('display_name')))
with self.assertQueryCount(4):
self.message_1.attributes = [
{
"name": "moderator_partner_ids",
"string": "Partners",
"type": "many2many",
"comodel": "test_new_api.partner",
"value": list(zip(partners[:10]._ids, partners[:10].mapped('display_name'))),
}
]
attributes = self.message_1.read(['attributes'])[0]['attributes']
self.assertEqual(attributes[0]['value'], name_get(partners[:10]))
partners[:5].unlink()
with self.assertQueryCount(5):
attributes = self.message_1.read(['attributes'])[0]['attributes']
self.assertEqual(attributes[0]['value'], name_get(partners[5:10]))
partners[5].unlink()
with self.assertQueryCount(5):
properties = self.message_1.read(['attributes'])[0]['attributes']
self.assertEqual(properties[0]['value'], name_get(partners[6:10]))
# need to wait next write to clean data in database
# a single read won't clean the removed many2many
attributes = self.message_1.read(['attributes'])[0]['attributes']
self.message_1.invalidate_recordset()
self.message_1.attributes = attributes
sql_values = self._get_sql_properties(self.message_1)
self.assertEqual(sql_values, {'moderator_partner_ids': partners[6:10].ids})
# Check that duplicated ids are removed
self.env.flush_all()
moderator_partner_ids = partners[6:10].ids
moderator_partner_ids += moderator_partner_ids[2:]
new_value = json.dumps({"moderator_partner_ids": moderator_partner_ids})
self.env.cr.execute(
"""
UPDATE test_new_api_message
SET attributes = %s
WHERE id = %s
""", (new_value, self.message_1.id, ),
)
self.env.invalidate_all()
properties = self.message_1.read(['attributes'], load=None)[0]['attributes']
self.assertEqual(
properties[0]['value'],
name_get(partners[6:10]),
msg='Should removed duplicated ids',
)
# write a list with many2many values
self.message_1.attributes = [{
'name': 'partner_ids',
'string': 'Partners',
'type': 'many2many',
'comodel': 'test_new_api.partner',
'default': [(partners[8].id, 'Alice')],
'value': [(partners[9].id, 'Bob')],
'definition_changed': True,
}]
sql_properties = self._get_sql_properties(self.message_1)
self.assertEqual(sql_properties, {'partner_ids': [partners[9].id]})
sql_definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(sql_definition, [{
'name': 'partner_ids',
'string': 'Partners',
'type': 'many2many',
'comodel': 'test_new_api.partner',
'default': [partners[8].id],
}])
properties = self.message_1.read(['attributes'])[0]['attributes']
self.assertEqual(
properties,
[{
'name': 'partner_ids',
'string': 'Partners',
'type': 'many2many',
'comodel': 'test_new_api.partner',
'default': [(partners[8].id, partners[8].display_name)],
'value': [(partners[9].id, partners[9].display_name)],
}])
# should not be able to set a transient model
with self.assertRaises(ValueError):
self.message_2.attributes = [{
"name": "partner_ids",
"type": "many2many",
"comodel": "test_new_api.transient_model",
"definition_changed": True,
}]
with self.assertRaises(ValueError):
self.discussion_1.attributes_definition = [{
"name": "partner_ids",
"type": "many2many",
"comodel": "test_new_api.transient_model",
}]
@users('test')
@mute_logger('odoo.addons.base.models.ir_rule', 'odoo.fields')
def test_properties_field_many2many_filtering(self):
# a user read a properties with a many2many and he doesn't have access to all records
tags = self.env['test_new_api.multi.tag'].create(
[{'name': f'Test Tag {i}'} for i in range(10)])
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': self.discussion_1.id,
'author': self.user.id,
'attributes': [{
'name': 'my_tags',
'type': 'many2many',
'comodel': 'test_new_api.multi.tag',
'value': tags.ids,
'definition_changed': True,
}],
})
self.env['ir.rule'].sudo().create({
'name': 'test_rule_tags',
'model_id': self.env['ir.model']._get('test_new_api.multi.tag').id,
'domain_force': [('name', 'not in', tags[5:].mapped('name'))],
'perm_read': True,
'perm_create': True,
'perm_write': True,
})
self.env.invalidate_all()
values = message.read(['attributes'])[0]['attributes'][0]['value']
self.assertEqual(values, [(tag.id, None if i >= 5 else tag.name) for i, tag in enumerate(tags.sudo())])
def test_properties_field_performance(self):
self.env.invalidate_all()
with self.assertQueryCount(5):
# read to put the partner name in cache
self.message_1.read(['attributes'])
with self.assertQueryCount(0, msg='Must read value from cache'):
self.message_1.attributes
expected = ["""
UPDATE "test_new_api_message"
SET "attributes" = "__tmp"."attributes"::jsonb,
"write_date" = "__tmp"."write_date"::timestamp,
"write_uid" = "__tmp"."write_uid"::int4
FROM (VALUES %s) AS "__tmp"("id", "attributes", "write_date", "write_uid")
WHERE "test_new_api_message"."id" = "__tmp"."id"
"""]
with self.assertQueryCount(1), self.assertQueries(expected):
self.message_1.attributes = [
{
"name": "discussion_color_code",
"type": "char",
"string": "Color Code",
"default": "blue",
"value": "red"
},
{
"name": "moderator_partner_id",
"type": "many2one",
"string": "Partner",
"comodel": "test_new_api.partner",
"value": None
},
]
self.message_1.flush_recordset()
last_message_id = self.env['test_new_api.message'].search([], order="id DESC", limit=1).id
# based on batch optimization, _read_format should not crash on non existing records
values = self.env['test_new_api.message'].browse((self.message_1.id, last_message_id + 1))._read_format(['attributes'])
self.assertEqual(len(values), 1)
self.assertEqual(values[0]['id'], self.message_1.id)
def test_properties_field_change_definition(self):
"""Test the behavior of the field when changing the definition."""
attributes_definition = self.discussion_1.attributes_definition
self.message_1.attributes = [
{
"name": "discussion_color_code",
"value": None,
},
{
"name": "moderator_partner_id",
"value": None,
}
]
self.env.invalidate_all()
self.assertEqual(self.message_1.attributes, {
'discussion_color_code': False,
'moderator_partner_id': False,
})
# add a property on the definition record
attributes_definition += [{'name': 'state', 'string': 'State', 'type': 'char'}]
self.discussion_1.attributes_definition = attributes_definition
self.message_1.attributes = [{'name': 'state', 'value': 'ready'}]
self.env.invalidate_all()
self.assertEqual(self.message_1.attributes, {
'discussion_color_code': False,
'moderator_partner_id': False,
'state': 'ready',
})
# remove a property from the definition
# the properties on the child should remain, until we write on it
# when reading, the removed property must be filtered
self.discussion_1.attributes_definition = attributes_definition[:-1] # remove the state field
self.assertEqual(self.message_1.attributes, {
'discussion_color_code': False,
'moderator_partner_id': False,
'state': 'ready',
})
value = self._get_sql_properties(self.message_1)
self.assertEqual(value.get('state'), 'ready', msg='The field should be in database')
self.message_1.attributes = [{'name': 'name', 'value': 'Test name'}]
value = self._get_sql_properties(self.message_1)
self.assertFalse(
value.get('state'),
msg='After updating an other property, the value must be cleaned')
# check that we can only set a allowed list of properties type
with self.assertRaises(ValueError):
self.discussion_1.attributes_definition = [{'name': 'state', 'type': 'wrong_type'}]
# check the property ID unicity
with self.assertRaises(ValueError):
self.discussion_1.attributes_definition = [
{'name': 'state', 'type': 'char'},
{'name': 'state', 'type': 'datetime'},
]
@mute_logger('odoo.fields')
def test_properties_field_onchange2(self):
"""If we change the definition record, the onchange of the properties field must be triggered."""
message_form = Form(self.env['test_new_api.message'])
with self.assertQueryCount(8):
message_form.discussion = self.discussion_1
message_form.author = self.user
self.assertEqual(
message_form.attributes,
[{
'name': 'discussion_color_code',
'string': 'Color Code',
'type': 'char',
'default': 'blue',
'value': 'blue',
}, {
'name': 'moderator_partner_id',
'string': 'Partner',
'type': 'many2one',
'comodel': 'test_new_api.partner',
'value': False,
}],
msg='Should take the new definition when changing the definition record',
)
# change the discussion field
message_form.discussion = self.discussion_2
properties = message_form.attributes
self.assertEqual(len(properties), 1)
self.assertEqual(
properties[0]['name'],
'state',
msg='Should take the values of the new definition record',
)
with self.assertQueryCount(6):
message = message_form.save()
self.assertEqual(message.attributes, {'state': 'draft'})
# check cached value
cached_value = self.env.cache.get(message, message._fields['attributes'])
self.assertEqual(cached_value, {'state': 'draft'})
# change the definition record, change the definition and add default values
self.assertEqual(message.discussion, self.discussion_2)
with self.assertQueryCount(4):
message.discussion = self.discussion_1
self.assertEqual(
self.discussion_1.attributes_definition,
[{
'name': 'discussion_color_code',
'type': 'char',
'string': 'Color Code',
'default': 'blue',
}, {
'name': 'moderator_partner_id',
'type': 'many2one',
'string': 'Partner',
'comodel': 'test_new_api.partner',
}],
)
self.assertEqual(
message.read()[0]['attributes'],
[{
'name': 'discussion_color_code',
'type': 'char',
'string': 'Color Code',
'default': 'blue',
'value': 'blue',
}, {
'name': 'moderator_partner_id',
'type': 'many2one',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'value': False,
}],
)
self.discussion_1.attributes_definition = False
self.discussion_2.attributes_definition = [{
'name': 'test',
'type': 'char',
'default': 'Default',
}]
# change the message discussion to remove the properties
# discussion 1 -> discussion 2
message.discussion = self.discussion_2
message.attributes = [{'name': 'test', 'value': 'Test'}]
fields_spec = message._get_fields_spec()
self.assertIn('discussion', fields_spec)
self.assertIn('attributes', fields_spec)
values = {
'discussion': self.discussion_1.id,
'attributes': [{
'name': 'test',
'type': 'char',
'default': 'Default',
'value': 'Test',
}],
}
result = message.onchange(values, ['discussion'], fields_spec)
self.assertIn('attributes', result['value'], 'Should have detected the definition record change')
self.assertEqual(result['value']['attributes'], [], 'Should have reset the properties definition')
# change the message discussion to add new properties
# discussion 2 -> discussion 1
message.discussion = self.discussion_1
values = {
'discussion': self.discussion_2.id,
'attributes': [],
}
result = message.onchange(values, ['discussion'], fields_spec)
self.assertIn('attributes', result['value'], 'Should have detected the definition record change')
self.assertEqual(
result['value']['attributes'],
[{'name': 'test', 'type': 'char', 'default': 'Default', 'value': 'Default'}],
'Should have reset the properties definition to the discussion 1 definition',
)
# change the definition record and the definition at the same time
message_form = Form(message)
message_form.discussion = self.discussion_2
message_form.attributes = [{
'name': 'new_property',
'type': 'char',
'value': 'test value',
'definition_changed': True,
}]
message = message_form.save()
self.assertEqual(
self.discussion_2.attributes_definition,
[{'name': 'new_property', 'type': 'char'}])
self.assertEqual(
message.attributes,
{'new_property': 'test value'})
# re-write the same parent again and check that value are not reset
message.discussion = message.discussion
self.assertEqual(
message.attributes,
{'new_property': 'test value'})
# trigger a other onchange after setting the properties
# and check that it does not impact the properties
message.discussion.attributes_definition = []
message_form = Form(message)
message.attributes = [{
'name': 'new_property',
'type': 'char',
'value': 'test value',
'definition_changed': True,
}]
message_form.body = "a" * 42
message = message_form.save()
self.assertEqual(
message.attributes,
{'new_property': 'test value'})
@mute_logger('odoo.fields')
def test_properties_field_definition_update(self):
"""Test the definition update from the child."""
self.discussion_1.attributes_definition = []
self.message_1.attributes = [
{
'name': 'my_many2one',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'type': 'many2one',
# send the value like the web client does
'default': [self.partner.id, 'Bob'],
'value': [self.partner_2.id, "Test"],
}, {
'name': 'my_many2many',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'type': 'many2many',
# send the value like the web client does
'default': [[self.partner.id, 'Bob'], [self.partner_2.id, 'Test']],
'value': [[self.partner_2.id, "Test"]],
'definition_changed': True,
},
]
self.env.invalidate_all()
sql_definition = self._get_sql_definition(self.discussion_1)
expected_definition = [
{
'name': 'my_many2one',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'type': 'many2one',
'default': self.partner.id,
}, {
'name': 'my_many2many',
'string': 'Partner',
'comodel': 'test_new_api.partner',
'type': 'many2many',
'default': [self.partner.id, self.partner_2.id],
},
]
self.assertEqual(sql_definition, expected_definition)
sql_properties = self._get_sql_properties(self.message_1)
expected_properties = {
'my_many2one': self.partner_2.id,
'my_many2many': [self.partner_2.id],
}
self.assertEqual(expected_properties, sql_properties)
@mute_logger('odoo.fields')
@users('test')
def test_properties_field_security(self):
"""Check the access right related to the Properties fields."""
def _mocked_check_access(records, operation):
if records.env.su:
return
raise AccessError('')
message = self.message_1.with_user(self.test_user)
# a user read a properties with a many2one to a record he doesn't have access to
tag = self.env['test_new_api.multi.tag'].create({'name': 'Test Tag'})
message.attributes = [{
'name': 'test',
'type': 'many2one',
'comodel': 'test_new_api.multi.tag',
'value': [tag.id, 'Tag'],
'definition_changed': True,
}]
values = message.read(['attributes'])[0]['attributes'][0]
self.assertEqual(values['value'], (tag.id, 'Test Tag'))
self.env.invalidate_all()
with patch('odoo.addons.test_new_api.models.test_new_api.MultiTag.check_access', _mocked_check_access):
values = message.read(['attributes'])[0]['attributes'][0]
self.assertEqual(values['value'], (tag.id, None))
# a user read a properties with a many2one to a record
# but doesn't have access to its parent
self.env.invalidate_all()
with patch('odoo.addons.test_new_api.models.test_new_api.Discussion.check_access', _mocked_check_access):
values = message.read(['attributes'])[0]['attributes'][0]
self.assertEqual(values['value'], (tag.id, 'Test Tag'))
@users('test')
def test_properties_field_no_parent_access(self):
"""We can read the child, but not the definition record.
Check that the user does not get an `AccessError` when creating a new
record having a property field whose property definition is stored on
a record the user does not have access to. The newly created record
should have the right schema and should be populated with the default
values stored on the property definition.
"""
def _mocked_check_access(records, operation):
if records.env.su:
return
raise AccessError('')
self.env.invalidate_all()
with patch('odoo.addons.test_new_api.models.test_new_api.Discussion.check_access', _mocked_check_access):
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': self.discussion_1.id,
'author': self.user.id,
'attributes': {
'moderator_partner_id': self.partner.id,
}
})
self.assertEqual(message.attributes, {
'discussion_color_code': 'blue',
'moderator_partner_id': self.partner.id
})
def test_properties_inherits(self):
email = self.env['test_new_api.emailmessage'].create({
'discussion': self.discussion_1.id,
'attributes': [{
'name': 'discussion_color_code',
'type': 'char',
'string': 'Color Code',
'default': 'blue',
'value': 'red',
}],
})
values = email.read(['attributes'])
self.assertEqual(values[0]['attributes'][0]['value'], 'red')
values = email.message.read(['attributes'])
self.assertEqual(values[0]['attributes'][0]['value'], 'red')
class PropertiesSearchCase(TestPropertiesMixin):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.messages = cls.message_1 | cls.message_2 | cls.message_3
cls.env['test_new_api.message'].search([('id', 'not in', cls.messages.ids)]).unlink()
@mute_logger('odoo.fields')
def test_properties_field_search_boolean(self):
# search on boolean
self.message_1.attributes = [{
'name': 'myboolean',
'type': 'boolean',
'value': True,
'definition_changed': True,
}]
self.message_2.attributes = {'myboolean': False}
messages = self.env['test_new_api.message'].search([('attributes.myboolean', '=', True)])
self.assertEqual(messages, self.message_1)
messages = self.env['test_new_api.message'].search([('attributes.myboolean', '!=', False)])
self.assertEqual(messages, self.message_1)
messages = self.env['test_new_api.message'].search([('attributes.myboolean', '=', False)])
# message 2 has a falsy boolean properties
# message 3 doesn't have the properties (key in dict doesn't exist)
self.assertEqual(messages, self.message_2 | self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.myboolean', '!=', True)])
self.assertEqual(messages, self.message_2 | self.message_3)
@mute_logger('odoo.fields')
def test_properties_field_search_char(self):
# search on text properties
self.message_1.attributes = [{
'name': 'mychar',
'type': 'char',
'value': 'Test',
'definition_changed': True,
}]
self.message_2.attributes = {'mychar': 'TeSt'}
messages = self.env['test_new_api.message'].search([('attributes.mychar', '=', 'Test')])
self.assertEqual(messages, self.message_1, "Should be able to search on a properties field")
messages = self.env['test_new_api.message'].search([('attributes.mychar', '=', '"Test"')])
self.assertFalse(messages)
messages = self.env['test_new_api.message'].search([('attributes.mychar', 'ilike', 'test')])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.mychar', 'not ilike', 'test')])
self.assertFalse(messages)
messages = self.env['test_new_api.message'].search([('attributes.mychar', 'ilike', '"test"')])
self.assertFalse(messages)
for forbidden_char in '! ()"\'.':
searches = (
f'mychar{forbidden_char}',
f'my{forbidden_char}char',
f'{forbidden_char}mychar',
)
for search in searches:
with self.assertRaises(ValueError), self.assertQueryCount(0):
self.env['test_new_api.message'].search([(f'attributes.{search}', '=', 'Test')])
# search falsy properties
self.message_3.discussion = self.message_2.discussion
self.message_3.attributes = [{'name': 'mychar', 'value': False}]
self.assertEqual(self._get_sql_properties(self.message_3), {'mychar': False})
messages = self.env['test_new_api.message'].search([('attributes.mychar', '=', False)])
self.assertEqual(messages, self.message_3)
# search falsy properties when the key doesn't exist in the dict
# message 2 properties is False, message 3 properties doesn't exist in database
self.message_2.attributes = [{'name': 'mychar', 'value': False}]
self.env.cr.execute(
"UPDATE test_new_api_message SET attributes = '{}' WHERE id = %s",
[self.message_3.id],
)
messages = self.env['test_new_api.message'].search([('attributes.mychar', '=', False)])
self.assertEqual(messages, self.message_2 | self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mychar', '!=', False)])
self.assertEqual(messages, self.message_1)
# message 1 property contain a string but is not falsy so it's not returned
messages = self.env['test_new_api.message'].search([('attributes.mychar', '!=', True)])
self.assertEqual(messages, self.message_2 | self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mychar', '=', True)])
self.assertEqual(messages, self.message_1)
# message 3 is now null instead of being an empty dict
self.env.cr.execute(
"UPDATE test_new_api_message SET attributes = NULL WHERE id = %s",
[self.message_3.id],
)
messages = self.env['test_new_api.message'].search([('attributes.mychar', '=', False)])
self.assertEqual(messages, self.message_2 | self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mychar', '!=', False)])
self.assertEqual(messages, self.message_1)
@mute_logger('odoo.fields')
def test_properties_field_search_float(self):
# search on float
self.message_1.attributes = [{
'name': 'myfloat',
'type': 'float',
'value': 3.14,
'definition_changed': True,
}]
self.message_2.attributes = {'myfloat': 5.55}
messages = self.env['test_new_api.message'].search([('attributes.myfloat', '>', 4.4)])
self.assertEqual(messages, self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.myfloat', '<', 4.4)])
self.assertEqual(messages, self.message_1)
messages = self.env['test_new_api.message'].search([('attributes.myfloat', '>', 1.1)])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.myfloat', '<=', 1.1)])
self.assertFalse(messages)
messages = self.env['test_new_api.message'].search([('attributes.myfloat', '=', 3.14)])
self.assertEqual(messages, self.message_1)
@mute_logger('odoo.fields')
def test_properties_field_search_integer(self):
# search on integer
self.messages.discussion = self.discussion_1
self.message_1.attributes = [{
'name': 'myint',
'type': 'integer',
'value': 33,
'definition_changed': True,
}]
self.message_2.attributes = {'myint': 111}
self.message_3.attributes = {'myint': -2}
messages = self.env['test_new_api.message'].search([('attributes.myint', '>', 4)])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.myint', '<', 4)])
self.assertEqual(messages, self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.myint', '=', 111)])
self.assertEqual(messages, self.message_2)
# search on the JSONified value (operator "->>")
messages = self.env['test_new_api.message'].search([('attributes.myint', 'ilike', '1')])
self.assertEqual(messages, self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.myint', 'not ilike', '1')])
self.assertEqual(messages, self.message_1 | self.message_3)
@mute_logger('odoo.fields')
def test_properties_field_search_many2many(self):
self.messages.discussion = self.discussion_1
partners = self.env['res.partner'].create([{'name': 'A'}, {'name': 'B'}, {'name': 'C'}])
self.message_1.attributes = [{
'name': 'mymany2many',
'type': 'many2many',
'comodel': 'res.partner',
'value': partners.ids,
'definition_changed': True,
}]
self.message_2.attributes = {'mymany2many': [partners[1].id]}
self.message_3.attributes = {'mymany2many': [partners[2].id]}
messages = self.env['test_new_api.message'].search(
[('attributes.mymany2many', 'in', partners[0].id)])
self.assertEqual(messages, self.message_1)
messages = self.env['test_new_api.message'].search(
[('attributes.mymany2many', 'in', partners[1].id)])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search(
[('attributes.mymany2many', 'in', partners[2].id)])
self.assertEqual(messages, self.message_1 | self.message_3)
messages = self.env['test_new_api.message'].search(
[('attributes.mymany2many', 'not in', partners[0].id)])
self.assertEqual(messages, self.message_2 | self.message_3)
# IN operator (not supported on many2many and return weird results)
messages = self.env['test_new_api.message'].search(
[('attributes.mymany2many', 'in', [partners[0].id, partners[1].id])])
self.assertEqual(messages, self.message_2) # should be self.message_1 | self.message_2
@mute_logger('odoo.fields')
def test_properties_field_search_many2one(self):
# many2one are just like integer
self.messages.discussion = self.discussion_1
self.message_1.attributes = [{
'name': 'mypartner',
'type': 'integer',
'value': self.partner.id,
'definition_changed': True,
}]
self.message_2.attributes = {'mypartner': self.partner_2.id}
self.message_3.attributes = {'mypartner': False}
messages = self.env['test_new_api.message'].search(
[('attributes.mypartner', 'in', [self.partner.id, self.partner_2.id])])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search(
[('attributes.mypartner', 'not in', [self.partner.id, self.partner_2.id])])
self.assertEqual(messages, self.message_3)
messages = self.env['test_new_api.message'].search(
[('attributes.mypartner', 'ilike', self.partner.display_name)])
self.assertFalse(messages, "The ilike on relational properties is not supported")
@mute_logger('odoo.fields')
def test_properties_field_search_tags(self):
self.messages.discussion = self.discussion_1
self.message_1.attributes = [{
'name': 'mytags',
'type': 'tags',
'value': ['a', 'b'],
'tags': [['a', 'A', 1], ['b', 'B', 2], ['aa', 'AA', 3]],
'definition_changed': True,
}]
self.message_2.attributes = {'mytags': ['b']}
self.message_3.attributes = {'mytags': ['aa']}
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', 'a')])
self.assertEqual(messages, self.message_1)
# the search is done on the JSONified value (operator "->>")
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'ilike', 'a')])
self.assertEqual(messages, self.message_1 | self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'not ilike', 'a')])
self.assertEqual(messages, self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', 'b')])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', 'aa')])
self.assertEqual(messages, self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'not in', 'b')])
self.assertEqual(messages, self.message_3)
# the search is done on the JSONified value (operator "->>")
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'ilike', '["aa"]')])
self.assertEqual(messages, self.message_3)
# IN operator on array
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', [])])
self.assertFalse(messages)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'not in', [])])
self.assertEqual(messages, self.message_1 | self.message_2 | self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', ['a', 'b'])])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', ['b', 'a'])])
self.assertEqual(messages, self.message_1 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', ['aa'])])
self.assertEqual(messages, self.message_3)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'in', ['aa', 'b'])])
self.assertEqual(messages, self.message_3 | self.message_2)
messages = self.env['test_new_api.message'].search([('attributes.mytags', 'not in', ['a', 'b'])])
self.assertEqual(messages, self.message_3)
@mute_logger('odoo.fields')
def test_properties_field_search_unaccent(self):
if not self.registry.has_unaccent:
# To enable unaccent feature:
# CREATE EXTENSION unaccent;
raise unittest.SkipTest("unaccent not enabled")
Model = self.env['test_new_api.message']
(self.message_1 | self.message_2).discussion = self.discussion_1
# search on text properties
self.message_1.attributes = [{
'name': 'mychar',
'type': 'char',
'value': 'Hélène',
'definition_changed': True,
}]
self.message_2.attributes = {'mychar': 'Helene'}
result = Model.search([('attributes.mychar', 'ilike', 'Helene')])
self.assertEqual(self.message_1 | self.message_2, result)
result = Model.search([('attributes.mychar', 'ilike', 'hélène')])
self.assertEqual(self.message_1 | self.message_2, result)
result = Model.search([('attributes.mychar', 'not ilike', 'Helene')])
self.assertNotIn(self.message_1, result)
self.assertNotIn(self.message_2, result)
result = Model.search([('attributes.mychar', 'not ilike', 'hélène')])
self.assertNotIn(self.message_1, result)
self.assertNotIn(self.message_2, result)
@mute_logger('odoo.fields')
def test_properties_field_search_orderby_string(self):
"""Test that we can order record by properties string values."""
(self.message_1 | self.message_2 | self.message_3).discussion = self.discussion_1
self.message_1.attributes = [{
'name': 'mychar',
'type': 'char',
'value': 'BB',
'definition_changed': True,
}]
self.message_2.attributes = {'mychar': 'AA'}
self.message_3.attributes = {'mychar': 'CC'}
self.env.flush_all()
result = self.env['test_new_api.message'].search(
domain=[['attributes.mychar', '!=', False]],
order='attributes.mychar ASC')
self.assertEqual(result[0], self.message_2)
self.assertEqual(result[1], self.message_1)
self.assertEqual(result[2], self.message_3)
result = self.env['test_new_api.message'].search(
domain=[['attributes.mychar', '!=', False]],
order='attributes.mychar DESC')
self.assertEqual(result[0], self.message_3)
self.assertEqual(result[1], self.message_1)
self.assertEqual(result[2], self.message_2)
@mute_logger('odoo.fields')
def test_properties_field_search_orderby_integer(self):
"""Test that we can order record by properties integer values."""
(self.message_1 | self.message_2 | self.message_3).discussion = self.discussion_1
self.message_1.attributes = [{
'name': 'myinteger',
'type': 'integer',
'value': 22,
'definition_changed': True,
}]
self.message_2.attributes = {'myinteger': 111}
self.message_3.attributes = {'myinteger': 33}
self.env.flush_all()
result = self.env['test_new_api.message'].search(
domain=[['attributes.myinteger', '!=', False]],
order='attributes.myinteger ASC')
self.assertEqual(result[0], self.message_1)
self.assertEqual(result[1], self.message_3)
self.assertEqual(result[2], self.message_2)
result = self.env['test_new_api.message'].search(
domain=[['attributes.myinteger', '!=', False]],
order='attributes.myinteger DESC')
self.assertEqual(result[0], self.message_2)
self.assertEqual(result[1], self.message_3)
self.assertEqual(result[2], self.message_1)
@mute_logger('odoo.fields')
def test_properties_field_search_orderby_injection(self):
"""Check the restriction on the property name."""
self.message_1.attributes = [{
'name': 'myinteger',
'type': 'integer',
'value': 22,
'definition_changed': True,
}]
for c in '! ()"\'.':
orders = (
f'attributes.myinteger{c} ASC',
f'attributes.my{c}integer ASC',
f'attribut{c}es.myinteger ASC',
)
if c == ' ':
# allow multiple spaces after the property name
orders = orders[1:]
for order in orders:
with self.assertRaises(UserError), self.assertQueryCount(0):
self.env['test_new_api.message'].search(domain=[], order=order)
@mute_logger('odoo.fields')
def test_properties_field_search(self):
with self.assertRaises(ValueError):
self.env['test_new_api.message'].search([('attributes', '=', '"Test"')])
@mute_logger('odoo.fields')
def test_properties_field_search_read_false(self):
Model = self.env['test_new_api.message']
discussion = self.env['test_new_api.discussion'].create({
'name': 'Test Discussion',
'participants': [Command.link(self.user.id)],
})
message = self.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': discussion.id,
'author': self.user.id,
})
discussion.attributes_definition = [{
'name': 'discussion_test',
'string': 'Discussion Test',
'type': 'char',
}]
message_values = Model.search_read([('id', '=', message.id)])
self.assertEqual(message_values[0]['attributes'][0]['value'], False, 'Value should be set as False')
class PropertiesGroupByCase(TestPropertiesMixin):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.message_4 = cls.env['test_new_api.message'].create({
'name': 'Test Message',
'discussion': cls.discussion_1.id,
'author': cls.user.id,
})
cls.messages = cls.message_1 | cls.message_2 | cls.message_3 | cls.message_4
cls.env['test_new_api.message'].search([('id', 'not in', cls.messages.ids)]).unlink()
cls.wrong_discussion_id = cls.env['test_new_api.discussion'].search(
[], order="id DESC", limit=1).id + 1000
@mute_logger('odoo.fields')
def test_properties_field_read_group_basic(self):
Model = self.env['test_new_api.message']
self.messages.discussion = self.discussion_1
# search on text properties
self.message_1.attributes = [{
'name': 'mychar',
'type': 'char',
'value': 'qsd',
'definition_changed': True,
}, {
'name': 'myinteger',
'type': 'integer',
'value': 1337,
'definition_changed': True,
}]
self.message_2.attributes = {'mychar': 'qsd', 'myinteger': 5}
self.message_3.attributes = {'mychar': 'boum', 'myinteger': 1337}
self.env.flush_all()
# group by the char property
with self.assertQueryCount(3):
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mychar'],
)
self.assertEqual(len(result), 3)
# check counts
count_by_values = {
value['attributes.mychar']: value['attributes.mychar_count']
for value in result
}
self.assertEqual(count_by_values['boum'], 1)
self.assertEqual(count_by_values['qsd'], 2)
self.assertEqual(count_by_values[False], 1)
# check domains
domain_by_values = {
value['attributes.mychar']: value['__domain']
for value in result
}
self.assertEqual(domain_by_values['boum'], [('attributes.mychar', '=', 'boum')])
self.assertEqual(domain_by_values['qsd'], [('attributes.mychar', '=', 'qsd')])
self._check_domains_count(result)
# group by the integer property
with self.assertQueryCount(3):
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger'],
)
self.assertEqual(len(result), 3)
count_by_values = {
value['attributes.myinteger']: value['attributes.myinteger_count']
for value in result
}
self.assertEqual(count_by_values[5], 1)
self.assertEqual(count_by_values[1337], 2)
self.assertEqual(count_by_values[False], 1)
# falsy properties
self.message_3.attributes = {'mychar': False, 'myinteger': False}
with self.assertQueryCount(3):
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger'],
)
self.assertEqual(result[-1]['attributes.myinteger_count'], 2)
self.assertEqual(result[-1]['__domain'], [('attributes.myinteger', '=', False)])
self._check_domains_count(result)
# non existing keys in the dict values should be grouped with False value
self.env.cr.execute(
"""
UPDATE test_new_api_message
SET attributes = '{}'
WHERE id = %s
""",
[self.message_2.id],
)
with self.assertQueryCount(3):
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger'],
)
self.assertEqual(result[-1]['attributes.myinteger_count'], 3)
self.assertEqual(result[-1]['__domain'], [('attributes.myinteger', '=', False)])
result = Model.search(result[-1]['__domain']) # check the domain is correct for the search
self.assertEqual(result, self.message_2 | self.message_3 | self.message_4)
# test the order by
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger'],
orderby='attributes.myinteger ASC'
)
self.assertEqual(len(result), 2)
self.assertEqual(result[0]['attributes.myinteger'], 1337)
self.assertEqual(result[1]['attributes.myinteger'], False)
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger'],
orderby='attributes.myinteger DESC'
)
self.assertEqual(len(result), 2)
self.assertEqual(result[0]['attributes.myinteger'], False)
self.assertEqual(result[1]['attributes.myinteger'], 1337)
self._check_domains_count(result)
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger', 'name'],
orderby='attributes.myinteger DESC',
lazy=False,
)
self.assertEqual(
result[0]['__domain'],
['&', ('attributes.myinteger', '=', False), ('name', '=', self.message_1.name)],
)
self._check_domains_count(result)
@mute_logger('odoo.fields')
def test_properties_field_read_progress_bar(self):
"""Test "_read_progress_bar" with a properties field."""
Model = self.env['test_new_api.message']
self.messages.discussion = self.discussion_1
self.message_1.attributes = [{
'name': 'myinteger',
'type': 'integer',
'value': 1337,
'definition_changed': True,
}]
self.message_2.attributes = {'myinteger': 5}
self.message_3.attributes = {'myinteger': 1337}
result = Model.read_progress_bar(
domain=[],
group_by='attributes.myinteger',
progress_bar={'field': 'priority', 'colors': [0]},
)
self.assertEqual(result, {'1337': {0: 2}, '5': {0: 1}, 'False': {0: 1}})
def _properties_field_read_group_date_prepare(self, date_type='date'):
# Format in database: 2023-03-29 09:30:16
self.messages.discussion = self.discussion_1
self.discussion_1.attributes_definition = [{
'type': date_type,
'name': 'mydate',
}]
hour = ' 13:05:34' if date_type == 'datetime' else ''
# message 5 has a different year
# message 6 has a False value
# message 7 is in a different discussion
self.message_5, self.message_6, self.message_7 = self.env['test_new_api.message'].create([
{'discussion': self.discussion_1.id, 'attributes': {'mydate': f'2077-05-02{hour}'}},
{'discussion': self.discussion_1.id, 'attributes': {'mydate': False}},
{'discussion': self.discussion_2.id},
])
self.message_1.attributes = {'mydate': f'2023-01-02{hour}'}
self.message_2.attributes = {'mydate': f'2023-02-03{hour}'}
self.message_3.attributes = {'mydate': f'2023-01-02{hour}'}
self.message_4.attributes = {'mydate': f'2023-02-05{hour}'}
self.env.flush_all()
@mute_logger('odoo.fields')
def test_properties_field_read_group_date_day(self, date_type='date'):
self._properties_field_read_group_date_prepare(date_type)
Model = self.env['test_new_api.message']
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:day'],
orderby='attributes.mydate DESC',
)
self.assertEqual(len(result), 5)
# check values and count
self.assertEqual(result[0]['attributes.mydate_count'], 2)
self.assertEqual(result[0]['attributes.mydate:day'], False)
self.assertEqual(result[1]['attributes.mydate_count'], 1)
self.assertEqual(result[1]['attributes.mydate:day'], '02 May 2077')
self.assertEqual(result[2]['attributes.mydate_count'], 1)
self.assertEqual(result[2]['attributes.mydate:day'], '05 Feb 2023')
self.assertEqual(result[3]['attributes.mydate_count'], 1)
self.assertEqual(result[3]['attributes.mydate:day'], '03 Feb 2023')
self.assertEqual(result[4]['attributes.mydate_count'], 2)
self.assertEqual(result[4]['attributes.mydate:day'], '02 Jan 2023')
# check domain
self.assertEqual(Model.search(result[0]['__domain']), self.message_6 | self.message_7)
self.assertEqual(Model.search(result[1]['__domain']), self.message_5)
self.assertEqual(Model.search(result[2]['__domain']), self.message_4)
self.assertEqual(Model.search(result[3]['__domain']), self.message_2)
self.assertEqual(Model.search(result[4]['__domain']), self.message_1 | self.message_3)
self._check_domains_count(result)
# when the order is not specified, the ORM will generate one
# based on "attributes.mydate ASC", make sure it works
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:year'],
)
self.assertEqual(len(result), 3)
self.assertEqual(result[0]['attributes.mydate:year'], '2023')
self.assertEqual(result[1]['attributes.mydate:year'], '2077')
self.assertEqual(result[2]['attributes.mydate:year'], False)
@mute_logger('odoo.fields')
def test_properties_field_read_group_date_quarter(self, date_type='date'):
self._properties_field_read_group_date_prepare(date_type)
Model = self.env['test_new_api.message']
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:quarter'],
orderby='attributes.mydate DESC',
)
self.assertEqual(len(result), 3)
# check values and count
self.assertEqual(result[0]['attributes.mydate_count'], 2)
self.assertEqual(result[0]['attributes.mydate:quarter'], False)
self.assertEqual(result[1]['attributes.mydate_count'], 1)
self.assertEqual(result[1]['attributes.mydate:quarter'], 'Q2 2077')
self.assertEqual(result[2]['attributes.mydate_count'], 4)
self.assertEqual(result[2]['attributes.mydate:quarter'], 'Q1 2023')
# check domain
self.assertEqual(Model.search(result[0]['__domain']), self.message_6 | self.message_7)
self.assertEqual(Model.search(result[1]['__domain']), self.message_5)
self.assertEqual(
Model.search(result[2]['__domain']),
self.message_1 | self.message_2 | self.message_3 | self.message_4)
self._check_domains_count(result)
@mute_logger('odoo.fields')
def test_properties_field_read_group_date_month(self, date_type='date'):
self._properties_field_read_group_date_prepare()
Model = self.env['test_new_api.message']
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:month'],
orderby='attributes.mydate DESC',
)
self.assertEqual(len(result), 4)
# check values and count
self.assertEqual(result[0]['attributes.mydate_count'], 2)
self.assertEqual(result[0]['attributes.mydate:month'], False)
self.assertEqual(result[1]['attributes.mydate_count'], 1)
self.assertEqual(result[1]['attributes.mydate:month'], 'May 2077')
self.assertEqual(result[2]['attributes.mydate_count'], 2)
self.assertEqual(result[2]['attributes.mydate:month'], 'February 2023')
self.assertEqual(result[3]['attributes.mydate_count'], 2)
self.assertEqual(result[3]['attributes.mydate:month'], 'January 2023')
# check domain
self.assertEqual(Model.search(result[0]['__domain']), self.message_6 | self.message_7)
self.assertEqual(Model.search(result[1]['__domain']), self.message_5)
self.assertEqual(Model.search(result[2]['__domain']), self.message_2 | self.message_4)
self.assertEqual(Model.search(result[3]['__domain']), self.message_1 | self.message_3)
self._check_domains_count(result)
@mute_logger('odoo.fields')
def test_properties_field_read_group_date_week(self, date_type='date'):
first_week_day = int(get_lang(self.env).week_start) - 1
self.assertEqual(first_week_day, 6, "First day of the week must be Sunday")
self._properties_field_read_group_date_prepare()
Model = self.env['test_new_api.message']
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:week'],
orderby='attributes.mydate DESC',
)
self.assertEqual(len(result), 5)
# check values and count
self.assertEqual(result[0]['attributes.mydate_count'], 2)
self.assertEqual(result[0]['attributes.mydate:week'], False)
self.assertEqual(result[1]['attributes.mydate_count'], 1)
self.assertEqual(result[1]['attributes.mydate:week'], 'W19 2077')
self.assertEqual(result[2]['attributes.mydate_count'], 1)
self.assertEqual(result[2]['attributes.mydate:week'], 'W6 2023')
self.assertEqual(result[3]['attributes.mydate_count'], 1)
self.assertEqual(result[3]['attributes.mydate:week'], 'W5 2023')
self.assertEqual(result[4]['attributes.mydate_count'], 2)
# Babel issue mitigation
# https://github.com/python-babel/babel/pull/621 -- introduced a new bug
# https://github.com/python-babel/babel/pull/887 -- proposed a fix but finally closed
# https://sources.debian.org/patches/python-babel/2.10.3-1/ -- Debian reverted 621
# so this ugly fix is made to have the test working in patched and non patched versions of Babel
babel_year = babel.dates.format_date(datetime.datetime(2023, 1, 1), "YYYY", "en_US") # non patched: '2022' patched: '2023'
if babel_year == '2022': # Broken unpatched babel
self.assertEqual(result[4]['attributes.mydate:week'], 'W1 2022')
else: # Patched babel
self.assertEqual(result[4]['attributes.mydate:week'], 'W1 2023')
# check domain
self.assertEqual(Model.search(result[0]['__domain']), self.message_6 | self.message_7)
self.assertEqual(Model.search(result[1]['__domain']), self.message_5)
self.assertEqual(Model.search(result[2]['__domain']), self.message_4)
self.assertEqual(Model.search(result[3]['__domain']), self.message_2)
self.assertEqual(Model.search(result[4]['__domain']), self.message_1 | self.message_3)
self._check_domains_count(result)
# test that the first day of the week in the domain care about the local
# (based on the lang, the first day of the week might change)
for line in result[1:]:
self.assertEqual(line['__domain'][1][1], ">=")
self.assertEqual(line['__domain'][2][1], "<")
start = datetime.datetime.strptime(line['__domain'][1][2], "%Y-%m-%d")
end = datetime.datetime.strptime(line['__domain'][2][2], "%Y-%m-%d")
self.assertEqual(start.weekday(), first_week_day)
self.assertEqual(end.weekday(), first_week_day)
# now, first day of the week is "Wednesday"
lang = self.env['res.lang'].with_context(active_test=False).search([('code', '=', 'fr_FR')])
self.assertEqual(len(lang), 1)
lang.write({'active': True, 'week_start': '3'})
result = Model.with_context(lang='fr_FR').read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:week'],
orderby='attributes.mydate DESC',
)
for line in result[1:]:
self.assertEqual(line['__domain'][1][1], ">=")
self.assertEqual(line['__domain'][2][1], "<")
start = datetime.datetime.strptime(line['__domain'][1][2], "%Y-%m-%d")
end = datetime.datetime.strptime(line['__domain'][2][2], "%Y-%m-%d")
self.assertEqual(start.weekday(), 2, "First day of the week must be Wednesday")
self.assertEqual(end.weekday(), 2, "First day of the week must be Wednesday")
@mute_logger('odoo.fields')
def test_properties_field_read_group_date_year(self, date_type='date'):
self._properties_field_read_group_date_prepare()
Model = self.env['test_new_api.message']
result = Model.read_group(
domain=[],
fields=[],
groupby=['attributes.mydate:year'],
orderby='attributes.mydate DESC',
)
self.assertEqual(len(result), 3)
# check values and count
self.assertEqual(result[0]['attributes.mydate_count'], 2)
self.assertEqual(result[0]['attributes.mydate:year'], False)
self.assertEqual(result[1]['attributes.mydate_count'], 1)
self.assertEqual(result[1]['attributes.mydate:year'], '2077')
self.assertEqual(result[2]['attributes.mydate_count'], 4)
self.assertEqual(result[2]['attributes.mydate:year'], '2023')
# check domain
self.assertEqual(Model.search(result[0]['__domain']), self.message_6 | self.message_7)
self.assertEqual(Model.search(result[1]['__domain']), self.message_5)
self.assertEqual(
Model.search(result[2]['__domain']),
self.message_1 | self.message_2 | self.message_3 | self.message_4)
self._check_domains_count(result)
def test_properties_field_read_group_datetime_day(self):
self.test_properties_field_read_group_date_day('datetime')
def test_properties_field_read_group_datetime_quarter(self):
self.test_properties_field_read_group_date_quarter('datetime')
def test_properties_field_read_group_datetime_month(self):
self.test_properties_field_read_group_date_month('datetime')
def test_properties_field_read_group_datetime_week(self):
self.test_properties_field_read_group_date_week('datetime')
def test_properties_field_read_group_datetime_year(self):
self.test_properties_field_read_group_date_year('datetime')
@mute_logger('odoo.fields')
def test_properties_field_read_group_injection(self):
Model = self.env['test_new_api.message']
self.message_1.attributes = [{
'name': 'myinteger',
'type': 'integer',
'value': 1337,
'definition_changed': True,
}]
self.env.flush_all()
with self.assertRaises(ValueError), self.assertQueryCount(0):
Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger'],
orderby='attributes.myinteger OR 1=1 DESC'
)
with self.assertRaises(ValueError), self.assertQueryCount(0):
Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger OR 1=1'],
orderby='attributes.myinteger DESC'
)
with self.assertRaises(ValueError), self.assertQueryCount(0):
Model.read_group(
domain=[],
fields=[],
groupby=['attributes.myinteger:wrongfunction'],
orderby='attributes.myinteger DESC'
)
with self.assertRaises(ValueError), self.assertQueryCount(0):
Model._read_group(
domain=[],
aggregates=['attributes.myinteger:sum'], # Aggregate is not supported
)
@mute_logger('odoo.fields', 'odoo.models.unlink')
def test_properties_field_read_group_many2many(self):
Model = self.env['test_new_api.message']
partners = self.env['test_new_api.partner'].create([
{'name': f'Partner {i}'}
for i in range(10)
])
self.discussion_1.attributes_definition = [{
'name': 'mypartners',
'string': 'Partners',
'type': 'many2many',
'comodel': 'test_new_api.partner',
}]
self.messages.discussion = self.discussion_1
self.message_1.attributes = {'mypartners': partners[:5].ids}
self.message_2.attributes = {'mypartners': partners[3:8].ids}
self.message_3.attributes = {'mypartners': partners[8:].ids}
(partners[4] | partners[7] | partners[9]).unlink()
with self.assertQueryCount(4):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mypartners'],
lazy=False,
)
self.assertEqual(len(result), 8)
existing_partners = partners.exists()
self.assertEqual(len(existing_partners), 7)
for partner, line in zip(existing_partners, result):
self.assertEqual(partner.id, line['attributes.mypartners'][0])
self.assertEqual(partner.display_name, line['attributes.mypartners'][1])
self.assertEqual(
line['__domain'],
[
'&',
('discussion', '!=', self.wrong_discussion_id),
('attributes.mypartners', 'in', partner.id),
],
)
# only the fourth partner is in 2 messages
self.assertEqual(line['__count'], 2 if partner == partners[3] else 1)
# message 4 is in a different discussion, so it's value is False
self.assertEqual(Model.search(result[-1]['__domain']), self.message_4)
self._check_many_falsy_group('mypartners', result)
self._check_domains_count(result)
# now message 1 and 2 will also be in the falsy group
partners[:8].unlink()
with self.assertQueryCount(4):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mypartners'],
lazy=False,
)
self.assertEqual(len(result), 2)
self.assertEqual(Model.search(result[-1]['__domain']), self.message_1 | self.message_2 | self.message_4)
self._check_many_falsy_group('mypartners', result)
self._check_domains_count(result)
# special case, no partner exists
existing_partners.unlink()
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mypartners'],
lazy=False,
)
self.assertEqual(len(result), 1)
self.assertFalse(result[0]['attributes.mypartners'])
self.assertEqual(result[0]['__count'], 4)
self._check_domains_count(result)
# test an invalid model name (e.g. if we uninstalled the module of the model)
for invalid_model_name in ("invalid_model_name", "test_new_api.transient_model"):
self.env.cr.execute(
"""
UPDATE test_new_api_discussion
SET attributes_definition
= jsonb_set(attributes_definition, '{0,comodel}', %s)
WHERE id = %s
""",
[json.dumps(invalid_model_name), self.discussion_1.id],
) # bypass the ORM to set an invalid model name
definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(definition[0]['comodel'], invalid_model_name)
error_message = f"You cannot use “Partners” because the linked “{invalid_model_name}” model doesn't exist or is invalid"
with self.assertRaisesRegex(UserError, error_message):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mypartners'],
lazy=False,
)
@mute_logger('odoo.fields')
def test_properties_field_read_group_many2one(self):
Model = self.env['test_new_api.message']
# group by many2one property
self.message_1.attributes = [{
'name': 'mypartner',
'string': 'My Partner',
'type': 'many2one',
'value': self.partner_2.id,
'comodel': 'test_new_api.partner',
'definition_changed': True,
}]
self.message_2.attributes = {'mypartner': self.partner.id}
self.message_4.attributes = {'mypartner': False} # explicit False value
# this partner id doesn't exist
unexisting_record_id = self.env['test_new_api.partner'].search(
[], order="id DESC", limit=1).id + 1
self.env.cr.execute(
"""
UPDATE test_new_api_message
SET attributes = '{"mypartner": %s}'
WHERE id = %s
""",
[unexisting_record_id, self.message_3.id],
)
with self.assertQueryCount(3):
result = Model.read_group(
domain=[],
fields=['name', 'attributes', 'discussion'],
groupby=['attributes.mypartner'],
)
self.assertEqual(len(result), 3, 'Should ignore the partner that has been removed')
self.assertEqual(result[0]['attributes.mypartner_count'], 1)
self.assertEqual(result[0]['attributes.mypartner'][0], self.partner.id)
self.assertEqual(result[0]['attributes.mypartner'][1], self.partner.display_name)
self.assertEqual(result[0]['__domain'], [('attributes.mypartner', '=', self.partner.id)])
self.assertEqual(result[1]['attributes.mypartner_count'], 1)
self.assertEqual(result[1]['attributes.mypartner'][0], self.partner_2.id)
self.assertEqual(result[1]['attributes.mypartner'][1], self.partner_2.display_name)
self.assertEqual(result[1]['__domain'], [('attributes.mypartner', '=', self.partner_2.id)])
# falsy domain, automatically generated, contains the false value
# and the ids of the records that doesn't exist in the database
self.assertEqual(result[2]['attributes.mypartner_count'], 2)
self.assertEqual(result[2]['attributes.mypartner'], False)
self.assertEqual(
result[2]['__domain'],
[
'|',
('attributes.mypartner', '=', False),
('attributes.mypartner', 'not in', (self.partner.id, self.partner_2.id)),
],
)
# when there's no "('property', '=', False)" domain, it should be created
self.message_4.attributes = {'mypartner': self.partner.id}
result = Model.read_group(
domain=[],
fields=['name', 'attributes', 'discussion'],
groupby=['attributes.mypartner'],
)
self.assertEqual(result[2]['attributes.mypartner_count'], 1)
self.assertEqual(
result[2]['__domain'],
[
'|',
('attributes.mypartner', '=', False),
('attributes.mypartner', 'not in', (self.partner.id, self.partner_2.id)),
],
)
# test an invalid model name (e.g. if we uninstalled the module of the model)
# should have only one group with the value "False", and all records
for invalid_model_name in ("invalid_model_name", "test_new_api.transient_model"):
self.env.cr.execute(
"""
UPDATE test_new_api_discussion
SET attributes_definition
= jsonb_set(attributes_definition, '{0,comodel}', %s::jsonb)
WHERE id = %s
""",
[json.dumps(invalid_model_name), self.discussion_1.id],
) # bypass the ORM to set an invalid model name
definition = self._get_sql_definition(self.discussion_1)
self.assertEqual(definition[0]['comodel'], invalid_model_name)
error_message = f"You cannot use “My Partner” because the linked “{invalid_model_name}” model doesn't exist or is invalid"
with self.assertRaisesRegex(UserError, error_message):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mypartner'],
lazy=False,
)
@mute_logger('odoo.fields')
def test_properties_field_read_group_selection(self):
Model = self.env['test_new_api.message']
# group by selection property
self.message_1.attributes = [{
'name': 'myselection',
'type': 'selection',
'value': 'optionA',
'selection': [['optionA', 'A'], ['optionB', 'B']],
'definition_changed': True,
}, {
'name': 'mychar2',
'type': 'char',
'value': 'qsd',
'definition_changed': True,
}]
self.message_2.attributes = {'myselection': False}
self.env.cr.execute(
"""
UPDATE test_new_api_message
SET attributes = '{"myselection": "invalid_option"}'
WHERE id = %s
""",
[self.message_3.id],
)
with self.assertQueryCount(3):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=['name', 'attributes', 'discussion'],
groupby=['attributes.myselection'],
)
self.assertEqual(len(result), 2)
self.assertEqual(result[0]['attributes.myselection_count'], 1)
self.assertEqual(
result[0]['__domain'],
[
'&',
('discussion', '!=', self.wrong_discussion_id), # original domain should be preserved
('attributes.myselection', '=', 'optionA'),
],
)
self.assertEqual(result[0]['attributes.myselection'], 'optionA')
# check that the option that is not valid is included in the "False" domain
# the count should be updated as well
self.assertEqual(result[1]['attributes.myselection_count'], 3)
self.assertEqual(
result[1]['__domain'],
[
'&',
('discussion', '!=', self.wrong_discussion_id),
'|',
('attributes.myselection', '=', False),
('attributes.myselection', 'not in', ('optionA', 'optionB')),
],
)
self.assertEqual(result[1]['attributes.myselection'], False)
# double check that the returned domain filter the right record
self.assertEqual(
self.env['test_new_api.message'].search(result[1]['__domain']),
self.message_2 | self.message_3 | self.message_4,
)
# special case, there's no option
self.message_1.attributes = [{
'name': 'myselection',
'type': 'selection',
'value': 'optionA',
'selection': [],
'definition_changed': True,
}]
self.env.flush_all()
result = Model.read_group(
domain=[],
fields=['attributes'],
groupby=['attributes.myselection'],
)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['attributes.myselection_count'], 4)
self._check_domains_count(result)
@mute_logger('odoo.fields')
def test_properties_field_read_group_tags(self):
Model = self.env['test_new_api.message']
(self.message_1 | self.message_2 | self.message_3).discussion = self.discussion_1
# group by tags property
self.message_1.attributes = [{
'name': 'mytags',
'type': 'tags',
'value': ['a', 'c', 'g'],
'tags': [[x.lower(), x, i] for i, x in enumerate('ABCDEFG')],
'definition_changed': True,
}]
self.message_2.attributes = {'mytags': ['a', 'e', 'g']}
self.env.cr.execute(
"""
UPDATE test_new_api_message
SET attributes = '{"mytags": ["a", "d", "invalid", "e"]}'
WHERE id = %s
""",
[self.message_3.id],
)
self.env.invalidate_all()
with self.assertQueryCount(3):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mytags'],
lazy=False,
)
self.assertNotIn('invalid', str(result))
self.assertEqual(len(result), 6)
all_tags = self.message_1.read(['attributes'])[0]['attributes'][0]['tags']
all_tags = {tag[0]: tag for tag in all_tags}
for group, (tag, count) in zip(result, (('a', 3), ('c', 1), ('d', 1), ('e', 2), ('g', 2))):
self.assertEqual(group['attributes.mytags'], all_tags[tag])
self.assertEqual(group['__count'], count)
self.assertEqual(
group['__domain'],
[
'&',
('discussion', '!=', self.wrong_discussion_id),
('attributes.mytags', 'in', tag),
],
)
# check that the value when we read the record match the value of the group
property_values = [
next(pro['value'] for pro in values['attributes'])
for values in Model.search(group['__domain']).read(['attributes'])
]
self.assertTrue(all(tag in property_value for property_value in property_values))
self.assertEqual(Model.search(result[-1]['__domain']), self.message_4)
self._check_many_falsy_group('mytags', result)
self._check_domains_count(result)
# now message 3 has *only* invalid tags, so it should be in the falsy group
self.env.cr.execute(
"""
UPDATE test_new_api_message
SET attributes = '{"mytags": ["invalid 1", "invalid 2", "invalid 3"]}'
WHERE id = %s
""",
[self.message_3.id],
)
self.env.invalidate_all()
with self.assertQueryCount(3):
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mytags'],
lazy=False,
)
self.assertEqual(Model.search(result[-1]['__domain']), self.message_3 | self.message_4)
self._check_many_falsy_group('mytags', result)
self._check_domains_count(result)
# special case, there's no tag
for tags in ([], False, None):
self.message_1.attributes = [{
'name': 'mytags',
'type': 'tags',
'value': tags,
'tags': tags,
'definition_changed': True,
}]
self.env.flush_all()
result = Model.read_group(
domain=[('discussion', '!=', self.wrong_discussion_id)],
fields=[],
groupby=['attributes.mytags'],
lazy=False,
)
self.assertEqual(len(result), 1)
self.assertFalse(result[0]['attributes.mytags'])
self.assertEqual(result[0]['__count'], 4)
self._check_domains_count(result)
def _check_domains_count(self, result):
"""Check that the domains in the result match the __count key."""
for line in result:
records = self.env['test_new_api.message'].search(line['__domain'])
count_key = next(key for key in line if "_count" in key)
self.assertEqual(len(records), line[count_key])
def _check_many_falsy_group(self, property_name, result):
"""Check the falsy group from the many2many and tags read group result.
- if a record is in the falsy group, it can't be in a other group
(that sentence is not true for non-falsy group, a record
with a non-falsy value can be in many other groups)
- read the value of all records and check if they belongs to the correct group
"""
Model = self.env['test_new_api.message']
falsy_group = result[-1]
self.assertFalse(falsy_group[f'attributes.{property_name}'])
falsy_records = Model.search(falsy_group['__domain'])
nonfalsy_records = Model.search(expression.OR([line['__domain'] for line in result[:-1]]))
self.assertEqual(Model.search_count([]), len(falsy_records) + len(nonfalsy_records))
for falsy_record in falsy_records:
self.assertNotIn(falsy_record, nonfalsy_records)
def _get_records_values(records):
return [
next(
(pro['value'] for pro in properties['attributes']
if pro['name'] == property_name),
)
for properties in records.read(['attributes'])
]
self.assertTrue(not any(_get_records_values(falsy_records)))
self.assertTrue(all(_get_records_values(nonfalsy_records)))