# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. # # test cases for new-style fields # import base64 import json from collections import OrderedDict from datetime import date, datetime, time import io from PIL import Image from unittest.mock import patch import psycopg2 import threading from odoo import models, fields, Command from odoo.addons.base.tests.common import TransactionCaseWithUserDemo from odoo.addons.base.tests.test_expression import TransactionExpressionCase from odoo.exceptions import AccessError, MissingError, UserError, ValidationError from odoo.tests import TransactionCase, tagged, Form, users from odoo.tools import mute_logger, float_repr from odoo.tools.date_utils import add, subtract, start_of, end_of from odoo.tools.image import image_data_uri class TestFields(TransactionCaseWithUserDemo, TransactionExpressionCase): def setUp(self): # for tests methods that create custom models/fields self.addCleanup(self.registry.reset_changes) self.addCleanup(self.registry.clear_all_caches) super(TestFields, self).setUp() self.env.ref('test_new_api.discussion_0').write({'participants': [Command.link(self.user_demo.id)]}) # YTI FIX ME: The cache shouldn't be inconsistent (rco is gonna fix it) # self.env.ref('test_new_api.discussion_0').participants -> 1 user # self.env.ref('test_new_api.discussion_0').invalidate() # self.env.ref('test_new_api.discussion_0').with_context(active_test=False).participants -> 2 users self.env.ref('test_new_api.message_0_1').write({'author': self.user_demo.id}) def test_00_basics(self): """ test accessing new fields """ # find a discussion discussion = self.env.ref('test_new_api.discussion_0') # read field as a record attribute or as a record item self.assertIsInstance(discussion.name, str) self.assertIsInstance(discussion['name'], str) self.assertEqual(discussion['name'], discussion.name) # read it with method read() values = discussion.read(['name'])[0] self.assertEqual(values['name'], discussion.name) def test_01_basic_get_assertion(self): """ test item getter """ # field access works on single record record = self.env.ref('test_new_api.message_0_0') self.assertEqual(len(record), 1) ok = record.body # field access fails on multiple records records = self.env['test_new_api.message'].search([]) assert len(records) > 1 with self.assertRaises(ValueError): faulty = records.body def test_01_basic_set_assertion(self): """ test item setter """ # field assignment works on single record record = self.env.ref('test_new_api.message_0_0') self.assertEqual(len(record), 1) record.body = 'OK' # field assignment on multiple records should assign value to all records records = self.env['test_new_api.message'].search([]) records.body = 'Updated' self.assertTrue(all(map(lambda record:record.body=='Updated', records))) # field assigmenent does not cache the wrong value when write overridden record.priority = 4 self.assertEqual(record.priority, 5) def test_05_unknown_fields(self): """ test ORM operations with unknown fields """ cat = self.env['test_new_api.category'].create({'name': 'Foo'}) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.search([('zzz', '=', 42)]) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.search([], order='zzz') with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.read(['zzz']) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.read_group([('zzz', '=', 42)], fields=['color'], groupby=['parent']) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.read_group([], fields=['zzz'], groupby=['parent']) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.read_group([], fields=['zzz:sum'], groupby=['parent']) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.read_group([], fields=['color'], groupby=['zzz']) with self.assertRaisesRegex(ValueError, 'is not a valid aggregate'): cat.read_group([], fields=['color'], groupby=['parent'], orderby='zzz') # exception: accept '__count' as field to aggregate cat.read_group([], fields=['__count'], groupby=['parent']) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.create({'name': 'Foo', 'zzz': 42}) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.write({'zzz': 42}) with self.assertRaisesRegex(ValueError, 'Invalid field'): cat.new({'name': 'Foo', 'zzz': 42}) def test_10_computed(self): """ check definition of computed fields """ # by default function fields are not stored, readonly, not copied field = self.env['test_new_api.message']._fields['size'] self.assertFalse(field.store) self.assertFalse(field.compute_sudo) self.assertTrue(field.readonly) self.assertFalse(field.copy) field = self.env['test_new_api.message']._fields['name'] self.assertTrue(field.store) self.assertTrue(field.compute_sudo) self.assertTrue(field.readonly) self.assertFalse(field.copy) # stored editable computed fields are copied according to their type field = self.env['test_new_api.compute.onchange']._fields['baz'] self.assertTrue(field.store) self.assertTrue(field.compute_sudo) self.assertFalse(field.readonly) self.assertTrue(field.copy) field = self.env['test_new_api.compute.onchange']._fields['line_ids'] self.assertTrue(field.store) self.assertTrue(field.compute_sudo) self.assertFalse(field.readonly) self.assertFalse(field.copy) # like a regular one2many field field = self.env['test_new_api.compute.onchange']._fields['tag_ids'] self.assertTrue(field.store) self.assertTrue(field.compute_sudo) self.assertFalse(field.readonly) self.assertTrue(field.copy) # like a regular many2many field def test_10_computed_custom(self): """ check definition of custom computed fields """ # Flush demo user before creating a new ir.model.fields to avoid # a deadlock self.env.flush_all() self.env['ir.model.fields'].create({ 'name': 'x_bool_false_computed', 'model_id': self.env.ref('test_new_api.model_test_new_api_message').id, 'field_description': 'A boolean computed to false', 'compute': "for r in self: r['x_bool_false_computed'] = False", 'store': False, 'ttype': 'boolean' }) field = self.env['test_new_api.message']._fields['x_bool_false_computed'] self.assertFalse(self.registry.field_depends[field]) def test_10_computed_custom_invalid_transitive_depends(self): self.patch(type(self.env["ir.model.fields"]), "_check_depends", lambda self: True) self.env["ir.model.fields"].create( { "name": "x_computed_custom_valid_depends", "model_id": self.env.ref("test_new_api.model_test_new_api_foo").id, "state": "manual", "field_description": "A compute with a valid depends", "compute": "for r in self: r['x_computed_custom_valid_depends'] = False", "depends": "value1", "store": False, "ttype": "boolean", } ) self.env["ir.model.fields"].create( { "name": "x_computed_custom_valid_transitive_depends", "model_id": self.env.ref("test_new_api.model_test_new_api_foo").id, "state": "manual", "field_description": "A compute with a valid transitive depends", "compute": "for r in self: r['x_computed_custom_valid_transitive_depends'] = False", "depends": "x_computed_custom_valid_depends", "store": False, "ttype": "boolean", } ) self.env["ir.model.fields"].create( { "name": "x_computed_custom_invalid_depends", "model_id": self.env.ref("test_new_api.model_test_new_api_foo").id, "state": "manual", "field_description": "A compute with an invalid depends", "compute": "for r in self: r['x_computed_custom_invalid_depends'] = False", "depends": "bar", "store": False, "ttype": "boolean", } ) self.env["ir.model.fields"].create( { "name": "x_computed_custom_invalid_transitive_depends", "model_id": self.env.ref("test_new_api.model_test_new_api_foo").id, "state": "manual", "field_description": "A compute with an invalid transitive depends", "compute": "for r in self: r['x_computed_custom_invalid_transitive_depends'] = False", "depends": "x_computed_custom_invalid_depends", "store": False, "ttype": "boolean", } ) fields = self.env["test_new_api.foo"]._fields get_trigger_tree = self.registry.get_trigger_tree value1 = fields["value1"] valid_depends = fields["x_computed_custom_valid_depends"] valid_transitive_depends = fields["x_computed_custom_valid_transitive_depends"] invalid_depends = fields["x_computed_custom_invalid_depends"] invalid_transitive_depends = fields["x_computed_custom_invalid_transitive_depends"] # `x_computed_custom_valid_depends` in the triggers of the field `value1` self.assertTrue(valid_depends in get_trigger_tree([value1]).root) # `x_computed_custom_valid_transitive_depends` in the triggers `x_computed_custom_valid_depends` and `value1` self.assertTrue(valid_transitive_depends in get_trigger_tree([valid_depends]).root) self.assertTrue(valid_transitive_depends in get_trigger_tree([value1]).root) # `x_computed_custom_invalid_depends` not in any triggers, as it was invalid and was skipped self.assertEqual( sum(invalid_depends in get_trigger_tree([field]).root for field in fields.values()), 0 ) # `x_computed_custom_invalid_transitive_depends` in the triggers of `x_computed_custom_invalid_depends` only self.assertTrue(invalid_transitive_depends in get_trigger_tree([invalid_depends]).root) self.assertEqual( sum(invalid_transitive_depends in get_trigger_tree([field]).root for field in fields.values()), 1 ) @mute_logger('odoo.fields') def test_10_computed_stored_x_name(self): # create a custom model with two fields self.env["ir.model"].create({ "name": "x_test_10_compute_store_x_name", "model": "x_test_10_compute_store_x_name", "field_id": [ (0, 0, {'name': 'x_name', 'ttype': 'char'}), (0, 0, {'name': 'x_stuff_id', 'ttype': 'many2one', 'relation': 'ir.model'}), ], }) self.env.invalidate_all() # set 'x_stuff_id' refer to a model not loaded yet self.cr.execute(""" UPDATE ir_model_fields SET relation = 'not.loaded' WHERE model = 'x_test_10_compute_store_x_name' AND name = 'x_stuff_id' """) # set 'x_name' be computed and depend on 'x_stuff_id' self.cr.execute(""" UPDATE ir_model_fields SET compute = 'pass', depends = 'x_stuff_id.x_custom_1' WHERE model = 'x_test_10_compute_store_x_name' AND name = 'x_name' """) # setting up models should not crash self.registry.setup_models(self.cr) def test_10_display_name(self): """ test definition of automatic field 'display_name' """ field = type(self.env['test_new_api.discussion']).display_name self.assertTrue(field.automatic) self.assertTrue(field.compute) self.assertEqual(self.registry.field_depends[field], ('name',)) def test_10_non_stored(self): """ test non-stored fields """ # a field declared with store=False should not have a column field = self.env['test_new_api.category']._fields['dummy'] self.assertFalse(field.store) self.assertFalse(field.compute) self.assertFalse(field.inverse) # find messages for message in self.env['test_new_api.message'].search([]): # check definition of field self.assertEqual(message.size, len(message.body or '')) # check recomputation after record is modified size = message.size message.write({'body': (message.body or '') + "!!!"}) self.assertEqual(message.size, size + 3) # create a message, assign body, and check size in several environments message1 = self.env['test_new_api.message'].create({}) message2 = message1.with_user(self.user_demo) self.assertEqual(message1.size, 0) self.assertEqual(message2.size, 0) message1.write({'body': "XXX"}) self.assertEqual(message1.size, 3) self.assertEqual(message2.size, 3) # special case: computed field without dependency must be computed record = self.env['test_new_api.mixed'].create({}) self.assertTrue(record.now) def test_11_stored(self): """ test stored fields """ def check_stored(disc): """ Check the stored computed field on disc.messages """ for msg in disc.messages: self.assertEqual(msg.name, "[%s] %s" % (disc.name, msg.author.name)) # find the demo discussion, and check messages discussion1 = self.env.ref('test_new_api.discussion_0') self.assertTrue(discussion1.messages) check_stored(discussion1) # modify discussion name, and check again messages discussion1.name = 'Talking about stuff...' check_stored(discussion1) # switch message from discussion, and check again # See YTI FIXME self.env.invalidate_all() discussion2 = discussion1.copy({'name': 'Another discussion'}) message2 = discussion1.messages[0] message2.discussion = discussion2 check_stored(discussion2) # create a new discussion with messages, and check their name user_root = self.env.ref('base.user_root') user_demo = self.user_demo discussion3 = self.env['test_new_api.discussion'].create({ 'name': 'Stuff', 'participants': [Command.link(user_root.id), Command.link(user_demo.id)], 'messages': [ Command.create({'author': user_root.id, 'body': 'one'}), Command.create({'author': user_demo.id, 'body': 'two'}), Command.create({'author': user_root.id, 'body': 'three'}), ], }) check_stored(discussion3) # modify the discussion messages: edit the 2nd one, remove the last one # (keep modifications in that order, as they reproduce a former bug!) discussion3.write({ 'messages': [ Command.link(discussion3.messages[0].id), Command.update(discussion3.messages[1].id, {'author': user_root.id}), Command.delete(discussion3.messages[2].id), ], }) check_stored(discussion3) def test_11_stored_protected(self): """ test protection against recomputation """ model = self.env['test_new_api.compute.readonly'] field = model._fields['bar'] record = model.create({'foo': 'unprotected #1'}) self.assertEqual(record.bar, 'unprotected #1') record.write({'foo': 'unprotected #2'}) self.assertEqual(record.bar, 'unprotected #2') # by protecting 'bar', we prevent it from being recomputed with self.env.protecting([field], record): record.write({'foo': 'protected'}) self.assertEqual(record.bar, 'unprotected #2') # also works when nested with self.env.protecting([field], record): record.write({'foo': 'protected'}) self.assertEqual(record.bar, 'unprotected #2') record.write({'foo': 'protected'}) self.assertEqual(record.bar, 'unprotected #2') record.write({'foo': 'unprotected #3'}) self.assertEqual(record.bar, 'unprotected #3') # also works with duplicated fields with self.env.protecting([field, field], record): record.write({'foo': 'protected'}) self.assertEqual(record.bar, 'unprotected #3') record.write({'foo': 'unprotected #4'}) self.assertEqual(record.bar, 'unprotected #4') # we protect 'bar' on a different record with self.env.protecting([field], record): record2 = model.create({'foo': 'unprotected'}) self.assertEqual(record2.bar, 'unprotected') def test_11_computed_access(self): """ test computed fields with access right errors """ User = self.env['res.users'] user1 = User.create({'name': 'Aaaah', 'login': 'a'}) user2 = User.create({'name': 'Boooh', 'login': 'b'}) user3 = User.create({'name': 'Crrrr', 'login': 'c'}) # add a rule to not give access to user2 self.env['ir.rule'].create({ 'model_id': self.env['ir.model'].search([('model', '=', 'res.users')]).id, 'domain_force': "[('id', '!=', %d)]" % user2.id, }) # DLE P72: Since we decided that we do not raise security access errors for data to which we had the occassion # to put the value in the cache, we need to invalidate the cache for user1, user2 and user3 in order # to test the below access error. Otherwise the above create calls set in the cache the information needed # to compute `company_type` ('is_company'), and doesn't need to trigger a read. # We need to force the read in order to test the security access self.env.invalidate_all() # group users as a recordset, and read them as user demo users = (user1 + user2 + user3).with_user(self.user_demo) user1, user2, user3 = users # regression test: a bug invalidated the field's value from cache user1.company_type with self.assertRaises(AccessError): user2.company_type user3.company_type def test_12_recursive(self): """ test recursively dependent fields """ Category = self.env['test_new_api.category'] abel = Category.create({'name': 'Abel'}) beth = Category.create({'name': 'Bethany'}) cath = Category.create({'name': 'Catherine'}) dean = Category.create({'name': 'Dean'}) ewan = Category.create({'name': 'Ewan'}) finn = Category.create({'name': 'Finnley'}) gabe = Category.create({'name': 'Gabriel'}) cath.parent = finn.parent = gabe abel.parent = beth.parent = cath dean.parent = ewan.parent = finn self.assertEqual(abel.display_name, "Gabriel / Catherine / Abel") self.assertEqual(beth.display_name, "Gabriel / Catherine / Bethany") self.assertEqual(cath.display_name, "Gabriel / Catherine") self.assertEqual(dean.display_name, "Gabriel / Finnley / Dean") self.assertEqual(ewan.display_name, "Gabriel / Finnley / Ewan") self.assertEqual(finn.display_name, "Gabriel / Finnley") self.assertEqual(gabe.display_name, "Gabriel") ewan.parent = cath self.assertEqual(ewan.display_name, "Gabriel / Catherine / Ewan") cath.parent = finn self.assertEqual(ewan.display_name, "Gabriel / Finnley / Catherine / Ewan") def test_12_recursive_recompute(self): """ test recomputation on recursively dependent field """ a = self.env['test_new_api.recursive'].create({'name': 'A'}) b = self.env['test_new_api.recursive'].create({'name': 'B', 'parent': a.id}) c = self.env['test_new_api.recursive'].create({'name': 'C', 'parent': b.id}) d = self.env['test_new_api.recursive'].create({'name': 'D', 'parent': c.id}) self.assertEqual(a.full_name, 'A') self.assertEqual(b.full_name, 'A / B') self.assertEqual(c.full_name, 'A / B / C') self.assertEqual(d.full_name, 'A / B / C / D') self.assertEqual(a.display_name, 'A') self.assertEqual(b.display_name, 'A / B') self.assertEqual(c.display_name, 'A / B / C') self.assertEqual(d.display_name, 'A / B / C / D') a.name = 'A1' self.assertEqual(a.full_name, 'A1') self.assertEqual(b.full_name, 'A1 / B') self.assertEqual(c.full_name, 'A1 / B / C') self.assertEqual(d.full_name, 'A1 / B / C / D') self.assertEqual(a.display_name, 'A1') self.assertEqual(b.display_name, 'A1 / B') self.assertEqual(c.display_name, 'A1 / B / C') self.assertEqual(d.display_name, 'A1 / B / C / D') b.parent = False self.assertEqual(a.full_name, 'A1') self.assertEqual(b.full_name, 'B') self.assertEqual(c.full_name, 'B / C') self.assertEqual(d.full_name, 'B / C / D') self.assertEqual(a.display_name, 'A1') self.assertEqual(b.display_name, 'B') self.assertEqual(c.display_name, 'B / C') self.assertEqual(d.display_name, 'B / C / D') # rename several records to trigger several recomputations at once (d + c + b).write({'name': 'X'}) self.assertEqual(a.full_name, 'A1') self.assertEqual(b.full_name, 'X') self.assertEqual(c.full_name, 'X / X') self.assertEqual(d.full_name, 'X / X / X') self.assertEqual(a.display_name, 'A1') self.assertEqual(b.display_name, 'X') self.assertEqual(c.display_name, 'X / X') self.assertEqual(d.display_name, 'X / X / X') # delete b; both c and d are deleted in cascade; c should also be marked # to recompute, but recomputation should not fail... b.unlink() self.assertEqual((a + b + c + d).exists(), a) def test_12_recursive_tree(self): foo = self.env['test_new_api.recursive.tree'].create({'name': 'foo'}) self.assertEqual(foo.display_name, 'foo()') bar = foo.create({'name': 'bar', 'parent_id': foo.id}) self.assertEqual(foo.display_name, 'foo(bar())') baz = foo.create({'name': 'baz', 'parent_id': bar.id}) self.assertEqual(foo.display_name, 'foo(bar(baz()))') def test_12_recursive_unlink(self): order = self.env['test_new_api.recursive.order'].create({'value': 42}) line = self.env['test_new_api.recursive.line'].create({'order_id': order.id}) task = self.env['test_new_api.recursive.task'].create({'value': 42}) self.assertEqual(task.line_id, line) self.assertEqual(line.task_ids, task) self.assertTrue(line.task_number) # Before deleting order, the following are marked to recompute: # - task.line_id (recursive, depends on task.line_id.order_id.value) # - line.task_number (implicitely depends on line.task_ids.line_id) # # If task.line_id is ever recomputed in order to mark line.task_number, # its recomputed value will be lost in the cache invalidation, and # there will be nothing left to write in the database afterwards! This # makes the call to unlink() crash in that case. # order.unlink() def test_12_recursive_context_dependent(self): a = self.env['test_new_api.recursive'].create({'name': 'A'}) b = self.env['test_new_api.recursive'].create({'name': 'B', 'parent': a.id}) c = self.env['test_new_api.recursive'].create({'name': 'C', 'parent': b.id}) d = self.env['test_new_api.recursive'].create({'name': 'D', 'parent': c.id}) self.assertEqual(a.context_dependent_name, 'A') self.assertEqual(b.context_dependent_name, 'A / B') self.assertEqual(c.context_dependent_name, 'A / B / C') self.assertEqual(d.context_dependent_name, 'A / B / C / D') # now let's swith to another context to update the dependency a.with_context(bozo=42).name = 'A1' self.assertEqual(a.context_dependent_name, 'A1') self.assertEqual(b.context_dependent_name, 'A1 / B') self.assertEqual(c.context_dependent_name, 'A1 / B / C') self.assertEqual(d.context_dependent_name, 'A1 / B / C / D') def test_12_cascade(self): """ test computed field depending on computed field """ message = self.env.ref('test_new_api.message_0_0') self.env.invalidate_all() double_size = message.double_size self.assertEqual(double_size, message.size) record = self.env['test_new_api.cascade'].create({'foo': "Hi"}) self.assertEqual(record.baz, "<[Hi]>") record.foo = "Ho" self.assertEqual(record.baz, "<[Ho]>") def test_12_unlink_cascade_active_store(self): """ Test that `unlink` on many records doesn't raise a RecursionError with a stored related `active` field. """ message = self.env['test_new_api.message'].create({ 'active': False, }) self.env['test_new_api.emailmessage'].create( [{'message': message.id}] * 101, ) message.unlink() def test_12_unlink_cascade_ir_rule_using_related(self): """ Test that `unlink` on many records doesn't raise a RecursionError when there is an ir.rule with a stored related field to compute. """ message = self.env['test_new_api.message'].create({ 'active': False, }) self.env['test_new_api.emailmessage'].create( [{'message': message.id}] * 101, ) # Create an ir.rule, which forces to flush field 'active' self.env['ir.rule'].create({ 'model_id': self.env['ir.model']._get_id('test_new_api.emailmessage'), 'groups': [self.env.ref('base.group_user').id], 'domain_force': str([('active', '=', False)]), }) message.with_user(self.user_demo).unlink() def test_12_dynamic_depends(self): Model = self.registry['test_new_api.compute.dynamic.depends'] self.assertEqual(self.registry.field_depends[Model.full_name], ()) # the dependencies of full_name are stored in a config parameter self.env['ir.config_parameter'].set_param('test_new_api.full_name', 'name1,name2') # this must re-evaluate the field's dependencies self.env.flush_all() self.registry.setup_models(self.cr) self.assertEqual(self.registry.field_depends[Model.full_name], ('name1', 'name2')) def test_12_one2many_reference_domain(self): model = self.env['test_new_api.inverse_m2o_ref'] o2m_field = model._fields['model_ids'] self.assertEqual(o2m_field.get_domain_list(model), [('res_model', '=', model._name)]) o2m_field = model._fields['model_computed_ids'] self.assertEqual(o2m_field.get_domain_list(model), []) def test_13_inverse(self): """ test inverse computation of fields """ Category = self.env['test_new_api.category'] abel = Category.create({'name': 'Abel'}) beth = Category.create({'name': 'Bethany'}) cath = Category.create({'name': 'Catherine'}) dean = Category.create({'name': 'Dean'}) ewan = Category.create({'name': 'Ewan'}) finn = Category.create({'name': 'Finnley'}) gabe = Category.create({'name': 'Gabriel'}) self.assertEqual(ewan.display_name, "Ewan") ewan.display_name = "Abel / Bethany / Catherine / Erwan" self.assertEqual(beth.parent, abel) self.assertEqual(cath.parent, beth) self.assertEqual(ewan.parent, cath) self.assertEqual(ewan.name, "Erwan") # check create/write with several records vals = {'name': 'None', 'display_name': 'Foo'} foo1, foo2 = Category.create([vals, vals]) self.assertEqual(foo1.name, 'Foo') self.assertEqual(foo2.name, 'Foo') (foo1 + foo2).write({'display_name': 'Bar'}) self.assertEqual(foo1.name, 'Bar') self.assertEqual(foo2.name, 'Bar') # create/write on 'foo' should only invoke the compute method log = [] model = self.env['test_new_api.compute.inverse'].with_context(log=log) record = model.create({'foo': 'Hi'}) self.assertEqual(record.foo, 'Hi') self.assertEqual(record.bar, 'Hi') self.assertCountEqual(log, ['compute']) log.clear() record.write({'foo': 'Ho'}) self.assertEqual(record.foo, 'Ho') self.assertEqual(record.bar, 'Ho') self.assertCountEqual(log, ['compute']) # create/write on 'bar' should only invoke the inverse method log.clear() record = model.create({'bar': 'Hi'}) self.assertEqual(record.foo, 'Hi') self.assertEqual(record.bar, 'Hi') self.assertCountEqual(log, ['inverse']) log.clear() record.write({'bar': 'Ho'}) self.assertEqual(record.foo, 'Ho') self.assertEqual(record.bar, 'Ho') self.assertCountEqual(log, ['inverse']) # Test compatibility multiple compute/inverse fields log = [] model = self.env['test_new_api.multi_compute_inverse'].with_context(log=log) record = model.create({ 'bar1': '1', 'bar2': '2', 'bar3': '3', }) self.assertEqual(record.foo, '1/2/3') self.assertEqual(record.bar1, '1') self.assertEqual(record.bar2, '2') self.assertEqual(record.bar3, '3') self.assertCountEqual(log, ['inverse1', 'inverse23']) log.clear() record.write({'bar2': '4', 'bar3': '5'}) self.assertEqual(record.foo, '1/4/5') self.assertEqual(record.bar1, '1') self.assertEqual(record.bar2, '4') self.assertEqual(record.bar3, '5') self.assertCountEqual(log, ['inverse23']) log.clear() record.write({'bar1': '6', 'bar2': '7'}) self.assertEqual(record.foo, '6/7/5') self.assertEqual(record.bar1, '6') self.assertEqual(record.bar2, '7') self.assertEqual(record.bar3, '5') self.assertCountEqual(log, ['inverse1', 'inverse23']) log.clear() record.write({'foo': 'A/B/C'}) self.assertEqual(record.foo, 'A/B/C') self.assertEqual(record.bar1, 'A') self.assertEqual(record.bar2, 'B') self.assertEqual(record.bar3, 'C') self.assertCountEqual(log, ['compute']) # corner case: write on a field that is marked to compute log.clear() # writing on 'foo' marks 'bar1', 'bar2', 'bar3' to compute record.write({'foo': '1/2/3'}) self.assertCountEqual(log, []) # writing on 'bar3' must force the computation before updating record.write({'bar3': 'X'}) self.assertCountEqual(log, ['compute', 'inverse23']) self.assertEqual(record.foo, '1/2/X') self.assertEqual(record.bar1, '1') self.assertEqual(record.bar2, '2') self.assertEqual(record.bar3, 'X') self.assertCountEqual(log, ['compute', 'inverse23']) log.clear() # writing on 'foo' marks 'bar1', 'bar2', 'bar3' to compute record.write({'foo': 'A/B/C'}) self.assertCountEqual(log, []) # writing on 'bar1', 'bar2', 'bar3' discards the computation record.write({'bar1': 'X', 'bar2': 'Y', 'bar3': 'Z'}) self.assertCountEqual(log, ['inverse1', 'inverse23']) self.assertEqual(record.foo, 'X/Y/Z') self.assertEqual(record.bar1, 'X') self.assertEqual(record.bar2, 'Y') self.assertEqual(record.bar3, 'Z') self.assertCountEqual(log, ['inverse1', 'inverse23']) def test_13_inverse_access(self): """ test access rights on inverse fields """ foo = self.env['test_new_api.category'].create({'name': 'Foo'}) user = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'}) self.assertFalse(user.has_group('base.group_system')) # add group on non-stored inverse field self.patch(type(foo).display_name, 'groups', 'base.group_system') with self.assertRaises(AccessError): foo.with_user(user).display_name = 'Forbidden' def test_13_inverse_with_unlink(self): """ test x2many delete command combined with an inverse field """ country1 = self.env['res.country'].create({'name': 'test country', 'code': 'ZV'}) country2 = self.env['res.country'].create({'name': 'other country', 'code': 'ZX'}) company = self.env['res.company'].create({ 'name': 'test company', 'child_ids': [ (0, 0, {'name': 'Child Company 1'}), (0, 0, {'name': 'Child Company 2'}), ] }) child_company = company.child_ids[0] # check first that the field has an inverse and is not stored field = type(company).country_id self.assertFalse(field.store) self.assertTrue(field.inverse) company.write({'country_id': country1.id}) self.assertEqual(company.country_id, country1) company.write({ 'country_id': country2.id, 'child_ids': [(2, child_company.id)], }) self.assertEqual(company.country_id, country2) def test_14_search(self): """ test search on computed fields """ discussion = self.env.ref('test_new_api.discussion_0') # determine message sizes sizes = set(message.size for message in discussion.messages) # search for messages based on their size for size in sizes: messages0 = self.env['test_new_api.message'].search( [('discussion', '=', discussion.id), ('size', '<=', size)]) messages1 = self.env['test_new_api.message'].browse() for message in discussion.messages: if message.size <= size: messages1 += message self.assertEqual(messages0, messages1) def test_15_constraint(self): """ test new-style Python constraints """ discussion = self.env.ref('test_new_api.discussion_0') self.env.flush_all() # remove oneself from discussion participants: we can no longer create # messages in discussion discussion.participants -= self.env.user with self.assertRaises(ValidationError): self.env['test_new_api.message'].create({'discussion': discussion.id, 'body': 'Whatever'}) # make sure that assertRaises() does not leave fields to recompute self.assertFalse(self.env.fields_to_compute()) # put back oneself into discussion participants: now we can create # messages in discussion discussion.participants += self.env.user self.env['test_new_api.message'].create({'discussion': discussion.id, 'body': 'Whatever'}) # check constraint on recomputed field self.assertTrue(discussion.messages) with self.assertRaises(ValidationError): discussion.name = "X" def test_15_constraint_inverse(self): """ test constraint method on normal field and field with inverse """ log = [] model = self.env['test_new_api.compute.inverse'].with_context(log=log, log_constraint=True) # create/write with normal field only log.clear() record = model.create({'baz': 'Hi'}) self.assertCountEqual(log, ['constraint']) log.clear() record.write({'baz': 'Ho'}) self.assertCountEqual(log, ['constraint']) # create/write with inverse field only log.clear() record = model.create({'bar': 'Hi'}) self.assertCountEqual(log, ['inverse', 'constraint']) log.clear() record.write({'bar': 'Ho'}) self.assertCountEqual(log, ['inverse', 'constraint']) # create/write with both fields only log.clear() record = model.create({'bar': 'Hi', 'baz': 'Hi'}) self.assertCountEqual(log, ['inverse', 'constraint']) log.clear() record.write({'bar': 'Ho', 'baz': 'Ho'}) self.assertCountEqual(log, ['inverse', 'constraint']) def test_16_compute_unassigned(self): model = self.env['test_new_api.compute.unassigned'] # real record record = model.create({}) with self.assertRaises(ValueError): record.bar self.assertEqual(record.bare, False) self.assertEqual(record.bars, False) self.assertEqual(record.bares, False) # new record record = model.new() with self.assertRaises(ValueError): record.bar self.assertEqual(record.bare, False) self.assertEqual(record.bars, False) self.assertEqual(record.bares, False) def test_16_compute_unassigned_access_error(self): # create two records records = self.env['test_new_api.compute.unassigned'].create([{}, {}]) self.env.flush_all() # alter access rights: regular users cannot read 'records' access = self.env.ref('test_new_api.access_test_new_api_compute_unassigned') access.perm_read = False self.env.flush_all() # switch to environment with user demo records = records.with_user(self.user_demo) # check that records are not accessible with self.assertRaises(AccessError): records[0].bars with self.assertRaises(AccessError): records[1].bars # Modify the records and flush() changes with the current environment: # this should not trigger an access error, whatever the order in which # records are considered. It may fail in the following scenario: # - mark field 'bars' to compute on records # - access records[0].bars # - recompute bars on records (both) -> assign records[0] only # - return records[0].bars from cache # - access records[1].bars # - recompute nothing (done already) # - records[1].bars is not in cache # - fetch records[1].bars -> access error records[0].foo = "assign" records[1].foo = "x" self.env.flush_all() # try the other way around, too self.env.invalidate_all() records[0].foo = "x" records[1].foo = "assign" self.env.flush_all() def test_17_compute_depends_on_many2many(self): user1, user2, user3 = self.env['test_new_api.user'].create([{}, {}, {}]) group = self.env['test_new_api.group'].create({'user_ids': [Command.link(user1.id)]}) self.env.flush_all() field = type(user1).group_count self.assertFalse(self.env.records_to_compute(field)) # should mark user2 and user3 to compute only group.write({'user_ids': [Command.link(user1.id), Command.link(user2.id), Command.link(user3.id)]}) self.assertEqual(self.env.records_to_compute(field), user2 + user3) # should mark user2 to compute only self.env.flush_all() group.write({'user_ids': [Command.unlink(user2.id)]}) self.assertEqual(self.env.records_to_compute(field), user2) # should mark user2 and user3 to compute only self.env.flush_all() group.write({'user_ids': [Command.set([user1.id, user2.id])]}) self.assertEqual(self.env.records_to_compute(field), user2 + user3) # should mark user3 to compute only self.env.flush_all() user3.write({'group_ids': [Command.link(group.id)]}) self.assertEqual(self.env.records_to_compute(field), user3) # similar with new records, but only check recomputation user1 = self.env['test_new_api.user'].new({}) user2 = self.env['test_new_api.user'].new({}) group = self.env['test_new_api.group'].new({'user_ids': [user1.id]}) self.assertEqual(user1.group_count, 1) self.assertEqual(user2.group_count, 0) group.user_ids += user2 self.assertEqual(user1.group_count, 1) self.assertEqual(user2.group_count, 1) def test_20_float(self): """ test rounding of float fields """ record = self.env['test_new_api.mixed'].create({}) query = "SELECT 1 FROM test_new_api_mixed WHERE id=%s AND number=%s" # 2.49609375 (exact float) must be rounded to 2.5 record.write({'number': 2.49609375}) self.env.flush_all() self.cr.execute(query, [record.id, '2.5']) self.assertTrue(self.cr.rowcount) self.assertEqual(record.number, 2.5) # 1.1 (1.1000000000000000888178420 in float) must be 1.1 in database record.write({'number': 1.1}) self.env.flush_all() self.cr.execute(query, [record.id, '1.1']) self.assertTrue(self.cr.rowcount) self.assertEqual(record.number, 1.1) def test_21_float_digits(self): """ test field description """ precision = self.env.ref('test_new_api.decimal_new_api_number') description = self.env['test_new_api.mixed'].fields_get()['number2'] self.assertEqual(description['digits'], (16, precision.digits)) def check_monetary(self, record, amount, currency, msg=None): # determine the possible roundings of amount if currency: ramount = currency.round(amount) samount = float(float_repr(ramount, currency.decimal_places)) else: ramount = samount = amount # check the currency on record self.assertEqual(record.currency_id, currency) # check the value on the record self.assertIn(record.amount, [ramount, samount], msg) # check the value in the database self.env.flush_all() self.cr.execute('SELECT amount FROM test_new_api_mixed WHERE id=%s', [record.id]) value = self.cr.fetchone()[0] self.assertEqual(value, samount, msg) def test_20_monetary(self): """ test monetary fields """ model = self.env['test_new_api.mixed'] currency = self.env['res.currency'].with_context(active_test=False) amount = 14.70126 for rounding in [0.01, 0.0001, 1.0, 0]: # first retrieve a currency corresponding to rounding if rounding: currency = currency.search([('rounding', '=', rounding)], limit=1) self.assertTrue(currency, "No currency found for rounding %s" % rounding) else: # rounding=0 corresponds to currency=False currency = currency.browse() # case 1: create with amount and currency record = model.create({'amount': amount, 'currency_id': currency.id}) self.check_monetary(record, amount, currency, 'create(amount, currency)') # case 2: assign amount record.amount = 0 record.amount = amount self.check_monetary(record, amount, currency, 'assign(amount)') # case 3: write with amount and currency record.write({'amount': 0, 'currency_id': False}) record.write({'amount': amount, 'currency_id': currency.id}) self.check_monetary(record, amount, currency, 'write(amount, currency)') # case 4: write with amount only record.write({'amount': 0}) record.write({'amount': amount}) self.check_monetary(record, amount, currency, 'write(amount)') # case 5: write with amount on several records records = record + model.create({'currency_id': currency.id}) records.write({'amount': 0}) records.write({'amount': amount}) for record in records: self.check_monetary(record, amount, currency, 'multi write(amount)') def test_20_monetary_opw_2223134(self): """ test monetary fields with cache override """ model = self.env['test_new_api.monetary_order'] currency = self.env.ref('base.USD') def check(value): self.assertEqual(record.total, value) self.env.flush_all() self.cr.execute('SELECT total FROM test_new_api_monetary_order WHERE id=%s', [record.id]) [total] = self.cr.fetchone() self.assertEqual(total, value) # create, and compute amount record = model.create({ 'currency_id': currency.id, 'line_ids': [Command.create({'subtotal': 1.0})], }) check(1.0) # delete and add a line: the deletion of the line clears the cache, then # the recomputation of 'total' must prefetch record.currency_id without # screwing up the new value in cache record.write({ 'line_ids': [Command.delete(record.line_ids.id), Command.create({'subtotal': 1.0})], }) check(1.0) def test_20_monetary_related(self): """ test value rounding with related currency """ currency = self.env.ref('base.USD') monetary_base = self.env['test_new_api.monetary_base'].create({ 'base_currency_id': currency.id }) monetary_related = self.env['test_new_api.monetary_related'].create({ 'monetary_id': monetary_base.id, 'total': 1/3, }) self.env.cr.execute( "SELECT total FROM test_new_api_monetary_related WHERE id=%s", monetary_related.ids, ) [total] = self.env.cr.fetchone() self.assertEqual(total, .33) def test_20_like(self): """ test filtered_domain() on char fields. """ record = self.env['test_new_api.multi.tag'].create({'name': 'Foo'}) self.assertTrue(record.filtered_domain([('name', 'like', 'F')])) self.assertTrue(record.filtered_domain([('name', 'ilike', 'f')])) record.name = 'Bar' self.assertFalse(record.filtered_domain([('name', 'like', 'F')])) self.assertFalse(record.filtered_domain([('name', 'ilike', 'f')])) record.name = False self.assertFalse(record.filtered_domain([('name', 'like', 'F')])) self.assertFalse(record.filtered_domain([('name', 'ilike', 'f')])) def test_21_date(self): """ test date fields """ record = self.env['test_new_api.mixed'].create({}) # one may assign False or None record.date = None self.assertFalse(record.date) # one may assign date but not datetime objects record.date = date(2012, 5, 1) self.assertEqual(record.date, date(2012, 5, 1)) # DLE P41: We now support to assign datetime to date. Not sure this is the good practice though. # with self.assertRaises(TypeError): # record.date = datetime(2012, 5, 1, 10, 45, 0) # one may assign dates and datetime in the default format, and it must be checked record.date = '2012-05-01' self.assertEqual(record.date, date(2012, 5, 1)) record.date = "2012-05-01 10:45:00" self.assertEqual(record.date, date(2012, 5, 1)) with self.assertRaises(ValueError): record.date = '12-5-1' # check filtered_domain self.assertTrue(record.filtered_domain([('date', '<', '2012-05-02')])) self.assertTrue(record.filtered_domain([('date', '<', date(2012, 5, 2))])) self.assertTrue(record.filtered_domain([('date', '<', datetime(2012, 5, 2, 12, 0, 0))])) self.assertTrue(record.filtered_domain([('date', '!=', False)])) self.assertFalse(record.filtered_domain([('date', '=', False)])) record.date = None self.assertFalse(record.filtered_domain([('date', '<', '2012-05-02')])) self.assertFalse(record.filtered_domain([('date', '<', date(2012, 5, 2))])) self.assertFalse(record.filtered_domain([('date', '<', datetime(2012, 5, 2, 12, 0, 0))])) self.assertFalse(record.filtered_domain([('date', '!=', False)])) self.assertTrue(record.filtered_domain([('date', '=', False)])) def test_21_datetime(self): """ test datetime fields """ for i in range(0, 10): self.assertEqual(fields.Datetime.now().microsecond, 0) record = self.env['test_new_api.mixed'].create({}) # assign falsy value record.moment = None self.assertFalse(record.moment) # assign string record.moment = '2012-05-01' self.assertEqual(record.moment, datetime(2012, 5, 1)) record.moment = '2012-05-01 06:00:00' self.assertEqual(record.moment, datetime(2012, 5, 1, 6)) with self.assertRaises(ValueError): record.moment = '12-5-1' # assign date or datetime record.moment = date(2012, 5, 1) self.assertEqual(record.moment, datetime(2012, 5, 1)) record.moment = datetime(2012, 5, 1, 6) self.assertEqual(record.moment, datetime(2012, 5, 1, 6)) # check filtered_domain self.assertTrue(record.filtered_domain([('moment', '<', '2012-05-02')])) self.assertTrue(record.filtered_domain([('moment', '<', date(2012, 5, 2))])) self.assertTrue(record.filtered_domain([('moment', '<', datetime(2012, 5, 1, 12, 0, 0))])) self.assertTrue(record.filtered_domain([('moment', '!=', False)])) self.assertFalse(record.filtered_domain([('moment', '=', False)])) record.moment = None self.assertFalse(record.filtered_domain([('moment', '<', '2012-05-02')])) self.assertFalse(record.filtered_domain([('moment', '<', date(2012, 5, 2))])) self.assertFalse(record.filtered_domain([('moment', '<', datetime(2012, 5, 2, 12, 0, 0))])) self.assertFalse(record.filtered_domain([('moment', '!=', False)])) self.assertTrue(record.filtered_domain([('moment', '=', False)])) def test_21_date_datetime_helpers(self): """ test date/datetime fields helpers """ _date = fields.Date.from_string("2077-10-23") _datetime = fields.Datetime.from_string("2077-10-23 09:42:00") # addition self.assertEqual(add(_date, days=5), date(2077, 10, 28)) self.assertEqual(add(_datetime, seconds=10), datetime(2077, 10, 23, 9, 42, 10)) # subtraction self.assertEqual(subtract(_date, months=1), date(2077, 9, 23)) self.assertEqual(subtract(_datetime, hours=2), datetime(2077, 10, 23, 7, 42, 0)) # start_of # year self.assertEqual(start_of(_date, 'year'), date(2077, 1, 1)) self.assertEqual(start_of(_datetime, 'year'), datetime(2077, 1, 1)) # quarter q1 = date(2077, 1, 1) q2 = date(2077, 4, 1) q3 = date(2077, 7, 1) q4 = date(2077, 10, 1) self.assertEqual(start_of(_date.replace(month=3), 'quarter'), q1) self.assertEqual(start_of(_date.replace(month=5), 'quarter'), q2) self.assertEqual(start_of(_date.replace(month=7), 'quarter'), q3) self.assertEqual(start_of(_date, 'quarter'), q4) self.assertEqual(start_of(_datetime, 'quarter'), datetime.combine(q4, time.min)) # month self.assertEqual(start_of(_date, 'month'), date(2077, 10, 1)) self.assertEqual(start_of(_datetime, 'month'), datetime(2077, 10, 1)) # week self.assertEqual(start_of(_date, 'week'), date(2077, 10, 18)) self.assertEqual(start_of(_datetime, 'week'), datetime(2077, 10, 18)) # day self.assertEqual(start_of(_date, 'day'), _date) self.assertEqual(start_of(_datetime, 'day'), _datetime.replace(hour=0, minute=0, second=0)) # hour with self.assertRaises(ValueError): start_of(_date, 'hour') self.assertEqual(start_of(_datetime, 'hour'), _datetime.replace(minute=0, second=0)) # invalid with self.assertRaises(ValueError): start_of(_datetime, 'poop') # end_of # year self.assertEqual(end_of(_date, 'year'), _date.replace(month=12, day=31)) self.assertEqual(end_of(_datetime, 'year'), datetime.combine(_date.replace(month=12, day=31), time.max)) # quarter q1 = date(2077, 3, 31) q2 = date(2077, 6, 30) q3 = date(2077, 9, 30) q4 = date(2077, 12, 31) self.assertEqual(end_of(_date.replace(month=2), 'quarter'), q1) self.assertEqual(end_of(_date.replace(month=4), 'quarter'), q2) self.assertEqual(end_of(_date.replace(month=9), 'quarter'), q3) self.assertEqual(end_of(_date, 'quarter'), q4) self.assertEqual(end_of(_datetime, 'quarter'), datetime.combine(q4, time.max)) # month self.assertEqual(end_of(_date, 'month'), _date.replace(day=31)) self.assertEqual(end_of(_datetime, 'month'), datetime.combine(date(2077, 10, 31), time.max)) # week self.assertEqual(end_of(_date, 'week'), date(2077, 10, 24)) self.assertEqual(end_of(_datetime, 'week'), datetime.combine(datetime(2077, 10, 24), time.max)) # day self.assertEqual(end_of(_date, 'day'), _date) self.assertEqual(end_of(_datetime, 'day'), datetime.combine(_datetime, time.max)) # hour with self.assertRaises(ValueError): end_of(_date, 'hour') self.assertEqual(end_of(_datetime, 'hour'), datetime.combine(_datetime, time.max).replace(hour=_datetime.hour)) # invalid with self.assertRaises(ValueError): end_of(_datetime, 'crap') def test_22_selection(self): """ test selection fields """ record_list = self.env['test_new_api.selection'].create({}) self.assertIsInstance(record_list._fields['state'].selection, list) # the following selection is defined by a callable (method name) record_call = self.env['test_new_api.mixed'].create({}) self.assertIsInstance(record_call._fields['lang'].selection, str) # one may assign a value record_list.state = 'foo' record_call.lang = self.env['res.lang'].search([], limit=1).code # one may assign False or None record_list.state = None self.assertFalse(record_list.state) record_call.lang = None self.assertFalse(record_call.lang) # the assigned value is only checked for the list case with self.assertRaises(ValueError): record_list.state = 'zz_ZZ' record_call.lang = 'zz_ZZ' def test_23_relation(self): """ test relation fields """ demo = self.user_demo message = self.env.ref('test_new_api.message_0_0') # check environment of record and related records self.assertEqual(message.env, self.env) self.assertEqual(message.discussion.env, self.env) demo_env = self.env(user=demo) self.assertNotEqual(demo_env, self.env) # check environment of record and related records self.assertEqual(message.env, self.env) self.assertEqual(message.discussion.env, self.env) # "migrate" message into demo_env, and check again demo_message = message.with_user(demo) self.assertEqual(demo_message.env, demo_env) self.assertEqual(demo_message.discussion.env, demo_env) # See YTI FIXME self.env.invalidate_all() # assign record's parent to a record in demo_env message.discussion = message.discussion.copy({'name': 'Copy'}) # both message and its parent field must be in self.env self.assertEqual(message.env, self.env) self.assertEqual(message.discussion.env, self.env) def test_24_reference(self): """ test reference fields. """ record = self.env['test_new_api.mixed'].create({}) # one may assign False or None record.reference = None self.assertFalse(record.reference) # one may assign a user or a partner... record.reference = self.env.user self.assertEqual(record.reference, self.env.user) record.reference = self.env.user.partner_id self.assertEqual(record.reference, self.env.user.partner_id) # ... but no record from a model that starts with 'ir.' with self.assertRaises(ValueError): record.reference = self.env['ir.model'].search([], limit=1) def test_25_related(self): """ test related fields. """ message = self.env.ref('test_new_api.message_0_0') discussion = message.discussion # by default related fields are not stored field = message._fields['discussion_name'] self.assertFalse(field.store) self.assertFalse(field.readonly) # check value of related field self.assertEqual(message.discussion_name, discussion.name) # change discussion name, and check result discussion.name = 'Foo' self.assertEqual(message.discussion_name, 'Foo') # change discussion name via related field, and check result message.discussion_name = 'Bar' self.assertEqual(discussion.name, 'Bar') self.assertEqual(message.discussion_name, 'Bar') # change discussion name via related field on several records discussion1 = discussion.create({'name': 'X1'}) discussion2 = discussion.create({'name': 'X2'}) discussion1.participants = discussion2.participants = self.env.user message1 = message.create({'discussion': discussion1.id}) message2 = message.create({'discussion': discussion2.id}) self.assertEqual(message1.discussion_name, 'X1') self.assertEqual(message2.discussion_name, 'X2') (message1 + message2).write({'discussion_name': 'X3'}) self.assertEqual(discussion1.name, 'X3') self.assertEqual(discussion2.name, 'X3') # search on related field, and check result search_on_related = self.env['test_new_api.message'].search([('discussion_name', '=', 'Bar')]) search_on_regular = self.env['test_new_api.message'].search([('discussion.name', '=', 'Bar')]) self.assertEqual(search_on_related, search_on_regular) # check that field attributes are copied message_field = message.fields_get(['discussion_name'])['discussion_name'] discussion_field = discussion.fields_get(['name'])['name'] self.assertEqual(message_field['help'], discussion_field['help']) def test_25_related_attributes(self): """ test the attributes of related fields """ text = self.registry['test_new_api.foo'].text self.assertFalse(text.trim, "The target field is defined with trim=False") # trim=True is the default on the field's class self.assertTrue(type(text).trim, "By default, a Char field has trim=True") # the parameter 'trim' is not set in text1's definition, so the field # retrieves its value from text.trim text1 = self.registry['test_new_api.bar'].text1 self.assertFalse(text1.trim, "The related field retrieves trim=False from target") # text2 is defined with trim=True, so it should get that value text2 = self.registry['test_new_api.bar'].text2 self.assertTrue(text2.trim, "The related field was defined with trim=True") def test_25_related_single(self): """ test related fields with a single field in the path. """ record = self.env['test_new_api.related'].create({'name': 'A'}) self.assertEqual(record.related_name, record.name) self.assertEqual(record.related_related_name, record.name) # check searching on related fields records0 = self._search(record, [('name', '=', 'A')]) self.assertIn(record, records0) records1 = self._search(record, [('related_name', '=', 'A')]) self.assertEqual(records1, records0) records2 = self._search(record, [('related_related_name', '=', 'A')]) self.assertEqual(records2, records0) # check writing on related fields record.write({'related_name': 'B'}) self.assertEqual(record.name, 'B') record.write({'related_related_name': 'C'}) self.assertEqual(record.name, 'C') def test_25_related_multi(self): """ test write() on several related fields based on a common computed field. """ foo = self.env['test_new_api.foo'].create({'name': 'A', 'value1': 1, 'value2': 2}) oof = self.env['test_new_api.foo'].create({'name': 'B', 'value1': 1, 'value2': 2}) bar = self.env['test_new_api.bar'].create({'name': 'A'}) self.assertEqual(bar.foo, foo) self.assertEqual(bar.value1, 1) self.assertEqual(bar.value2, 2) self.env.invalidate_all() bar.write({'value1': 3, 'value2': 4}) self.assertEqual(foo.value1, 3) self.assertEqual(foo.value2, 4) # modify 'name', and search on 'foo': this should flush 'name' bar.name = 'B' self.assertEqual(bar.foo, oof) self.assertIn(bar, bar.search([('foo', 'in', oof.ids)])) def test_25_one2many_inverse_related(self): left = self.env['test_new_api.trigger.left'].create({}) right = self.env['test_new_api.trigger.right'].create({}) self.assertFalse(left.right_id) self.assertFalse(right.left_ids) self.assertFalse(right.left_size) # create middle: this should trigger left.right_id by traversing # middle.left_id, and right.left_size by traversing left.right_id # after its computation! middle = self.env['test_new_api.trigger.middle'].create({ 'left_id': left.id, 'right_id': right.id, }) self.assertEqual(left.right_id, right) self.assertEqual(right.left_ids, left) self.assertEqual(right.left_size, 1) # delete middle: this should trigger left.right_id by traversing # middle.left_id, and right.left_size by traversing left.right_id # before its computation! middle.unlink() self.assertFalse(left.right_id) self.assertFalse(right.left_ids) self.assertFalse(right.left_size) def test_26_inherited(self): """ test inherited fields. """ # a bunch of fields are inherited from res_partner for user in self.env['res.users'].search([]): partner = user.partner_id for field in ('is_company', 'name', 'email', 'country_id'): self.assertEqual(getattr(user, field), getattr(partner, field)) self.assertEqual(user[field], partner[field]) def test_27_company_dependent(self): """ test company-dependent fields. """ # Company-dependent field variants should handle 0, '' and NULL database values # in the same way as their 'normal' (non-company-dependent) variants. # This section relies on there being no company defaults, so it needs to run first. null_record = self.env['test_new_api.company'].create({}) null_record_normal = self.env['test_new_api.mixed'].create({}) null_record.invalidate_recordset() null_record_normal.invalidate_recordset() field_correspondence = [ ('foo', 'foo', ''), ('text', 'text', ''), ('date', 'date', False), ('moment', 'moment', False), ('truth', 'truth', False), ('count', 'count', 0), ('phi', 'number2', 0.0), ('html1', 'comment1', ''), ] # Check null values for field, normal_field, value_to_write in field_correspondence: self.assertEqual(null_record[field], null_record_normal[normal_field]) null_record[field] = null_record_normal[normal_field] = value_to_write # Check empty / 0 values null_record.invalidate_recordset() null_record_normal.invalidate_recordset() for field, normal_field, _ in field_correspondence: self.assertEqual(null_record[field], null_record_normal[normal_field]) # consider three companies company0 = self.env.ref('base.main_company') company1 = self.env['res.company'].create({'name': 'A'}) company2 = self.env['res.company'].create({'name': 'B'}) # create one user per company user0 = self.env['res.users'].create({ 'name': 'Foo', 'login': 'foo', 'company_id': company0.id, 'company_ids': [Command.set([company0.id, company1.id, company2.id])]}) user1 = self.env['res.users'].create({ 'name': 'Bar', 'login': 'bar', 'company_id': company1.id, 'company_ids': [Command.set([company0.id, company1.id, company2.id])]}) user2 = self.env['res.users'].create({ 'name': 'Baz', 'login': 'baz', 'company_id': company2.id, 'company_ids': [Command.set([company0.id, company1.id, company2.id])]}) # create values for many2one field tag0 = self.env['test_new_api.multi.tag'].create({'name': 'Qux'}) tag1 = self.env['test_new_api.multi.tag'].create({'name': 'Quux'}) tag2 = self.env['test_new_api.multi.tag'].create({'name': 'Quuz'}) # create default values for the company-dependent fields self.env['ir.default'].set('test_new_api.company', 'foo', 'default') self.env['ir.default'].set('test_new_api.company', 'foo', 'default1', company_id=company1.id) self.env['ir.default'].set('test_new_api.company', 'tag_id', tag0.id) # assumption: users don't have access to 'ir.default' accesses = self.env['ir.model.access'].search([('model_id.model', '=', 'ir.default')]) accesses.write(dict.fromkeys(['perm_read', 'perm_write', 'perm_create', 'perm_unlink'], False)) # create/modify a record, and check the value for each user record = self.env['test_new_api.company'].create({ 'foo': 'main', 'date': '1932-11-09', 'moment': '1932-11-09 00:00:00', 'tag_id': tag1.id, }) self.assertEqual(record.with_user(user0).foo, 'main') self.assertEqual(record.with_user(user1).foo, 'default1') self.assertEqual(record.with_user(user2).foo, 'default') self.assertEqual(str(record.with_user(user0).date), '1932-11-09') self.assertEqual(record.with_user(user1).date, False) self.assertEqual(record.with_user(user2).date, False) self.assertEqual(str(record.with_user(user0).moment), '1932-11-09 00:00:00') self.assertEqual(record.with_user(user1).moment, False) self.assertEqual(record.with_user(user2).moment, False) self.assertEqual(record.with_user(user0).tag_id, tag1) self.assertEqual(record.with_user(user1).tag_id, tag0) self.assertEqual(record.with_user(user2).tag_id, tag0) record.with_user(user1).write({ 'foo': 'alpha', 'date': '1932-12-10', 'moment': '1932-12-10 23:59:59', 'tag_id': tag2.id, }) self.assertEqual(record.with_user(user0).foo, 'main') self.assertEqual(record.with_user(user1).foo, 'alpha') self.assertEqual(record.with_user(user2).foo, 'default') self.assertEqual(str(record.with_user(user0).date), '1932-11-09') self.assertEqual(str(record.with_user(user1).date), '1932-12-10') self.assertEqual(record.with_user(user2).date, False) self.assertEqual(str(record.with_user(user0).moment), '1932-11-09 00:00:00') self.assertEqual(str(record.with_user(user1).moment), '1932-12-10 23:59:59') self.assertEqual(record.with_user(user2).moment, False) self.assertEqual(record.with_user(user0).tag_id, tag1) self.assertEqual(record.with_user(user1).tag_id, tag2) self.assertEqual(record.with_user(user2).tag_id, tag0) # regression: duplicated records caused values to be browse(browse(id)) recs = record.create({}) + record + record self.env.invalidate_all() for rec in recs.with_user(user0): self.assertIsInstance(rec.tag_id.id, int) # unlink value of a many2one (tag2), and check again tag2.unlink() self.assertEqual(record.with_user(user0).tag_id, tag1) self.assertEqual(record.with_user(user1).tag_id, tag0.browse()) self.assertEqual(record.with_user(user2).tag_id, tag0) record.with_user(user1).foo = False self.assertEqual(record.with_user(user0).foo, 'main') self.assertEqual(record.with_user(user1).foo, False) self.assertEqual(record.with_user(user2).foo, 'default') record.with_user(user0).with_company(company1).foo = 'beta' self.env.invalidate_all() self.assertEqual(record.with_user(user0).foo, 'main') self.assertEqual(record.with_user(user1).foo, 'beta') self.assertEqual(record.with_user(user2).foo, 'default') # add group on company-dependent field self.assertFalse(user0.has_group('base.group_system')) self.patch(type(record).foo, 'groups', 'base.group_system') with self.assertRaises(AccessError): record.with_user(user0).foo = 'forbidden' user0.write({'groups_id': [Command.link(self.env.ref('base.group_system').id)]}) record.with_user(user0).foo = 'yes we can' # add ir.rule to prevent access on record self.assertTrue(user0._is_internal()) rule = self.env['ir.rule'].create({ 'model_id': self.env['ir.model']._get_id(record._name), 'groups': [self.env.ref('base.group_user').id], 'domain_force': str([('id', '!=', record.id)]), }) with self.assertRaises(AccessError): record.with_user(user0).foo = 'forbidden' # create company record and attribute company_record = self.env['test_new_api.company'].create({'foo': 'ABC'}) attribute_record = self.env['test_new_api.company.attr'].create({ 'company': company_record.id, 'quantity': 1, }) self.assertEqual(attribute_record.bar, 'ABC') # change quantity, 'bar' should recompute to 'ABCABC' attribute_record.quantity = 2 self.assertEqual(attribute_record.bar, 'ABCABC') # change company field 'foo', 'bar' should recompute to 'DEFDEF' company_record.foo = 'DEF' self.assertEqual(attribute_record.company.foo, 'DEF') self.assertEqual(attribute_record.bar, 'DEFDEF') # a low priviledge user should be able to search on company_dependent fields company_record.env.user.groups_id -= self.env.ref('base.group_system') self.assertFalse(company_record.env.user.has_group('base.group_system')) company_records = self.env['test_new_api.company'].search([('foo', '=', 'DEF')]) self.assertEqual(len(company_records), 1) def test_27_company_dependent_bool_integer_float(self): company0 = self.env.ref('base.main_company') company1 = self.env['res.company'].create({'name': 'A'}) Model = self.env['test_new_api.company'] record = Model.create({}) record.invalidate_recordset() cr = self.env.cr cr.execute("SELECT truth, count, phi FROM test_new_api_company WHERE id = %s", (record.id,)) self.assertEqual(cr.fetchone(), (None, None, None)) for company in [company0, company1]: record_company = record.with_company(company) self.assertEqual(record_company.truth, False) self.assertEqual(record_company.count, 0) self.assertEqual(record_company.phi, 0.0) record.write({'truth': False, 'count': 0, 'phi': 0}) # write fallback equivalent record.invalidate_recordset() cr.execute("SELECT truth, count, phi FROM test_new_api_company WHERE id = %s", (record.id,)) self.assertEqual(cr.fetchone(), (None, None, None)) # NULL doesn't block read cr.execute("UPDATE test_new_api_company SET truth = %s, count = %s, phi = %s WHERE id = %s", ( json.dumps({str(company0.id): None}), json.dumps({str(company0.id): None}), json.dumps({str(company0.id): None}), record.id, )) for company in [company0, company1]: record_company = record.with_company(company) self.assertEqual(record_company.truth, False) self.assertEqual(record_company.count, 0) self.assertEqual(record_company.phi, 0.0) def test_28_company_dependent_search(self): """ Test the search on company-dependent fields in all corner cases. This assumes that filtered_domain() correctly filters records when its domain refers to company-dependent fields. """ IrDefault = self.env['ir.default'] Model = self.env['test_new_api.company'] # create 4 records for all cases: two with explicit truthy values, one # with an explicit falsy value, and one without an explicit value records = Model.create([{}] * 4) record_fallback = Model.create({}) # For each field, we assign values to the records, and test a number of # searches. The search cases are given by comparison operators, and for # each operator, we test a number of possible operands. Every search() # returns a subset of the records, and we compare it to an equivalent # search performed by filtered_domain(). def test_field(field_name, truthy_values, operations): # set ir.defaults to all records except the last one for rec, val in zip(records, truthy_values + [False]): rec[field_name] = val # test without default value test_cases(field_name, operations) # set default value to False IrDefault.set(Model._name, field_name, False) self.env.flush_all() self.env.invalidate_all() for rec, val in zip(records, truthy_values + [False]): rec[field_name] = val test_cases(field_name, operations, False) # set default value to truthy_values[0] IrDefault.set(Model._name, field_name, truthy_values[0]) self.env.flush_all() self.env.invalidate_all() for rec, val in zip(records, truthy_values + [False]): rec[field_name] = val test_cases(field_name, operations, truthy_values[0]) def test_cases(field_name, operations, default=None): model = self.env['test_new_api.company'] field = model._fields[field_name] field_fallback = field.get_company_dependent_fallback(model) record_fallback[field_name] = field_fallback current_thread = threading.current_thread() for operator, values in operations.items(): for value in values: domain = [(field_name, operator, value)] company_dependent_column_not_null = not record_fallback.filtered_domain(domain) if company_dependent_column_not_null: with self.subTest(domain=domain, default=default): Model.search([('id', 'in', records.ids)] + domain) current_thread.query_count = 0 current_thread.query_time = 0 Model.search([('id', 'in', records.ids)] + domain) # warmup if current_thread.query_count: # parent_of and child_of may need extra queries expected_contained_sqls = [''] * (current_thread.query_count - 1) + [f'"test_new_api_company"."{field_name}" IS NOT NULL'] with self.assertQueriesContain(expected_contained_sqls): Model.search([('id', 'in', records.ids)] + domain) # TODO complement of dates, child_of and parent_of are not working correctly, skip for now test_complement = "date" not in field.type and operator not in ['child_of', 'parent_of'] with self.subTest(domain=domain, default=default): self._search( Model, [('id', 'in', records.ids)] + domain, [('id', 'in', records.ids)], test_complement=test_complement, ) # boolean fields test_field('truth', [True, True], { '=': (True, False), '!=': (True, False), }) # integer fields test_field('count', [10, -2], { '=': (10, -2, 0, False), '!=': (10, -2, 0, False), '<': (10, -2, 0), '>=': (10, -2, 0), '<=': (10, -2, 0), '>': (10, -2, 0), }) # float fields test_field('phi', [1.61803, -1], { '=': (1.61803, -1, 0, False), '!=': (1.61803, -1, 0, False), '<': (1.61803, -1, 0), '>=': (1.61803, -1, 0), '<=': (1.61803, -1, 0), '>': (1.61803, -1, 0), }) # char fields test_field('foo', ['qwer', 'azer'], { 'like': ('qwer', 'azer'), 'ilike': ('qwer', 'azer'), 'not like': ('qwer', 'azer'), 'not ilike': ('qwer', 'azer'), '=': ('qwer', 'azer', False), '!=': ('qwer', 'azer', False), 'not in': (['qwer', 'azer'], ['qwer', False], [False], []), 'in': (['qwer', 'azer'], ['qwer', False], [False], []), }) # date fields date1, date2 = date(2021, 11, 22), date(2021, 11, 23) test_field('date', [date1, date2], { '=': (date1, date2, False), '!=': (date1, date2, False), '<': (date1, date2), '>=': (date1, date2), '<=': (date1, date2), '>': (date1, date2), }) # datetime fields moment1, moment2 = datetime(2021, 11, 22), datetime(2021, 11, 23) test_field('moment', [moment1, moment2], { '=': (moment1, moment2, False), '!=': (moment1, moment2, False), '<': (moment1, moment2), '>=': (moment1, moment2), '<=': (moment1, moment2), '>': (moment1, moment2), }) # many2one fields tag1, tag2 = self.env['test_new_api.multi.tag'].create([{'name': 'one'}, {'name': 'two'}]) test_field('tag_id', [tag1.id, tag2.id], { 'like': (tag1.name, tag2.name), 'ilike': (tag1.name, tag2.name), 'not like': (tag1.name, tag2.name), 'not ilike': (tag1.name, tag2.name), '=': (tag1.id, tag2.id, False), '!=': (tag1.id, tag2.id, False), 'in': ([tag1.id, tag2.id], [tag2.id, False], [False], []), 'not in': ([tag1.id, tag2.id], [tag2.id, False], [False], []), 'any': ([('name', '=', tag1.name)], [('name', '=', False)], []), 'not any': ([('name', '=', tag1.name)], [('name', '=', False)], []), }) company0 = self.env.ref('base.main_company') company1 = self.env['res.company'].create({'name': 'A1', 'parent_id': company0.id}) company2 = self.env['res.company'].create({'name': 'B1', 'parent_id': company1.id}) company1.partner_id.parent_id = company0.partner_id company2.partner_id.parent_id = company1.partner_id self.env.invalidate_all() test_field('company_id', [company1.id, company2.id], { 'child_of': (company0.id, company1.id, company2.id), 'parent_of': (company0.id, company1.id, company2.id), }) test_field('partner_id', [company1.id, company2.id], { 'child_of': (company0.partner_id.id, company1.partner_id.id, company2.partner_id.id), 'parent_of': (company0.partner_id.id, company1.partner_id.id, company2.partner_id.id), }) def test_29_company_dependent_html(self): company0 = self.env.ref('base.main_company') company1 = self.env['res.company'].create({'name': 'A'}) company2 = self.env['res.company'].create({'name': 'B'}) user0 = self.env['res.users'].create({ 'name': 'Foo', 'login': 'foo', 'company_id': company0.id, 'company_ids': [Command.set([company0.id, company1.id, company2.id])]}) user1 = self.env['res.users'].create({ 'name': 'Bar', 'login': 'bar', 'company_id': company1.id, 'company_ids': [Command.set([company0.id, company1.id, company2.id])]}) user2 = self.env['res.users'].create({ 'name': 'Baz', 'login': 'baz', 'company_id': company2.id, 'company_ids': [Command.set([company0.id, company1.id, company2.id])]}) some_ugly_html_0 = """

Oops this should maybe be sanitized % if object.some_field and not object.oriented: % if object.other_field: ${object.mako_thing} This is some html. % endif %if object.dummy_field:

user0

%endif""" some_ugly_html_1 = """

Oops this should maybe be sanitized % if object.some_field and not object.oriented:

% if object.other_field: ${object.mako_thing} This is some html. % endif %if object.dummy_field:

user1

%endif""" record = self.env['test_new_api.company'].create({ 'html1': some_ugly_html_0, 'html2': some_ugly_html_0, }) self.assertEqual(record.with_user(user0).html1, some_ugly_html_0, 'Error in HTML field: content was sanitized but field has sanitize=False') self.assertEqual(record.with_user(user1).html1, False) self.assertEqual(record.with_user(user2).html1, False) # sanitize should have closed tags left open in the original html for user0 self.assertIn('
', record.with_user(user0).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True') self.assertIn('', record.with_user(user0).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True') self.assertNotIn('', record.with_user(user1).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True') self.assertIn('', record.with_user(user1).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True') self.assertNotIn('.__get__(existing) # -> records._fetch_field() # -> records.fetch(['categories']) # -> records.check_access('read') # -> records._check_access('read') # -> records.sudo().filtered_domain(...) # -> .__get__(existing) # -> records._fetch_field() # -> records.fetch(['name', ...]) # -> ONE QUERY to read ['name', ...] of records # -> ONE QUERY for deleted.exists() / code: forbidden = missing.exists() # -> ONE QUERY for records.exists() / code: self = self.exists() # -> ONE QUERY to read the many2many of existing existing.categories # this one must trigger a MissingError with self.assertRaises(MissingError): deleted.categories # special case: should not fail Discussion.browse([None]).read(['categories']) def test_40_real_vs_new(self): """ test field access on new records vs real records. """ Model = self.env['test_new_api.category'] real_record = Model.create({'name': 'Foo'}) new_origin = Model.new({'name': 'Bar'}, origin=real_record) new_record = Model.new({'name': 'Baz'}) # non-computed non-stored field: default value real_record = real_record.with_context(default_dummy='WTF') new_origin = new_origin.with_context(default_dummy='WTF') new_record = new_record.with_context(default_dummy='WTF') self.assertEqual(real_record.dummy, 'WTF') self.assertEqual(new_origin.dummy, 'WTF') self.assertEqual(new_record.dummy, 'WTF') # non-computed stored field: origin or default if no origin real_record = real_record.with_context(default_color=42) new_origin = new_origin.with_context(default_color=42) new_record = new_record.with_context(default_color=42) self.assertEqual(real_record.color, 0) self.assertEqual(new_origin.color, 0) self.assertEqual(new_record.color, 42) # computed non-stored field: always computed self.assertEqual(real_record.display_name, 'Foo') self.assertEqual(new_origin.display_name, 'Bar') self.assertEqual(new_record.display_name, 'Baz') # computed stored field: origin or computed if no origin Model = self.env['test_new_api.recursive'] real_record = Model.create({'name': 'Foo'}) new_origin = Model.new({'name': 'Bar'}, origin=real_record) new_record = Model.new({'name': 'Baz'}) self.assertEqual(real_record.display_name, 'Foo') self.assertEqual(new_origin.display_name, 'Bar') self.assertEqual(new_record.display_name, 'Baz') # computed stored field with recomputation: always computed real_record.name = 'Fool' new_origin.name = 'Barr' new_record.name = 'Bazz' self.assertEqual(real_record.display_name, 'Fool') self.assertEqual(new_origin.display_name, 'Barr') self.assertEqual(new_record.display_name, 'Bazz') def test_40_new_defaults(self): """ Test new records with defaults. """ user = self.env.user discussion = self.env.ref('test_new_api.discussion_0') # create a new message; fields have their default value if not given new_msg = self.env['test_new_api.message'].new({'body': "XXX"}) self.assertFalse(new_msg.id) self.assertEqual(new_msg.body, "XXX") self.assertEqual(new_msg.author, user) # assign some fields; should have no side effect new_msg.discussion = discussion new_msg.body = "YYY" self.assertEqual(new_msg.discussion, discussion) self.assertEqual(new_msg.body, "YYY") self.assertNotIn(new_msg, discussion.messages) # check computed values of fields self.assertEqual(new_msg.name, "[%s] %s" % (discussion.name, user.name)) self.assertEqual(new_msg.size, 3) # extra tests for x2many fields with default cat1 = self.env['test_new_api.category'].create({'name': "Cat1"}) cat2 = self.env['test_new_api.category'].create({'name': "Cat2"}) discussion = discussion.with_context(default_categories=[Command.link(cat1.id)]) # no value gives the default value new_disc = discussion.new({'name': "Foo"}) self.assertEqual(new_disc.categories._origin, cat1) # value overrides default value new_disc = discussion.new({'name': "Foo", 'categories': [Command.link(cat2.id)]}) self.assertEqual(new_disc.categories._origin, cat2) def test_40_new_fields(self): """ Test new records with relational fields. """ # create a new discussion with all kinds of relational fields msg0 = self.env['test_new_api.message'].create({'body': "XXX"}) msg1 = self.env['test_new_api.message'].create({'body': "WWW"}) cat0 = self.env['test_new_api.category'].create({'name': 'AAA'}) cat1 = self.env['test_new_api.category'].create({'name': 'DDD'}) new_disc = self.env['test_new_api.discussion'].new({ 'name': "Stuff", 'moderator': self.env.uid, 'messages': [ Command.link(msg0.id), Command.link(msg1.id), Command.update(msg1.id, {'body': "YYY"}), Command.create({'body': "ZZZ"}) ], 'categories': [ Command.link(cat0.id), Command.link(cat1.id), Command.update(cat1.id, {'name': "BBB"}), Command.create({'name': "CCC"}) ], }) self.assertFalse(new_disc.id) # many2one field values are actual records self.assertEqual(new_disc.moderator.id, self.env.uid) # x2many fields values are new records new_msg0, new_msg1, new_msg2 = new_disc.messages self.assertFalse(new_msg0.id) self.assertFalse(new_msg1.id) self.assertFalse(new_msg2.id) new_cat0, new_cat1, new_cat2 = new_disc.categories self.assertFalse(new_cat0.id) self.assertFalse(new_cat1.id) self.assertFalse(new_cat2.id) # the x2many has its inverse field set self.assertEqual(new_msg0.discussion, new_disc) self.assertEqual(new_msg1.discussion, new_disc) self.assertEqual(new_msg2.discussion, new_disc) self.assertFalse(msg0.discussion) self.assertFalse(msg1.discussion) self.assertEqual(new_cat0.discussions, new_disc) # add other discussions self.assertEqual(new_cat1.discussions, new_disc) self.assertEqual(new_cat2.discussions, new_disc) self.assertNotIn(new_disc, cat0.discussions) self.assertNotIn(new_disc, cat1.discussions) # new lines are connected to their origin self.assertEqual(new_msg0._origin, msg0) self.assertEqual(new_msg1._origin, msg1) self.assertFalse(new_msg2._origin) self.assertEqual(new_cat0._origin, cat0) self.assertEqual(new_cat1._origin, cat1) self.assertFalse(new_cat2._origin) # the field values are either specific, or the same as the origin self.assertEqual(new_msg0.body, "XXX") self.assertEqual(new_msg1.body, "YYY") self.assertEqual(new_msg2.body, "ZZZ") self.assertEqual(msg0.body, "XXX") self.assertEqual(msg1.body, "WWW") self.assertEqual(new_cat0.name, "AAA") self.assertEqual(new_cat1.name, "BBB") self.assertEqual(new_cat2.name, "CCC") self.assertEqual(cat0.name, "AAA") self.assertEqual(cat1.name, "DDD") # special case for many2one fields that define _inherits new_email = self.env['test_new_api.emailmessage'].new({'body': "XXX"}) self.assertFalse(new_email.id) self.assertTrue(new_email.message) self.assertFalse(new_email.message.id) self.assertEqual(new_email.body, "XXX") new_email = self.env['test_new_api.emailmessage'].new({'message': msg0.id}) self.assertFalse(new_email.id) self.assertFalse(new_email._origin) self.assertFalse(new_email.message.id) self.assertEqual(new_email.message._origin, msg0) self.assertEqual(new_email.body, "XXX") # check that this does not generate an infinite recursion new_disc._convert_to_write(new_disc._cache) def test_40_new_convert_to_write(self): new_disc = self.env['test_new_api.discussion'].new({ 'name': "Stuff", 'moderator': self.env.uid, 'participants': [(6, 0, self.env.user.ids)], }) # Put the user groups in the cache of the new record new_disc.participants.groups_id # Check that the groups in the cache are not returned by convert_to_write # because no real change happened, the values are identical except that # self.env.user.groups_id._ids = (Id1, Id2, ...) whereas # new_disc.participants.groups_id._ids = (NewId(origin=Id1), NewId(origin=Id2), ...) field = new_disc._fields.get("participants") # make sure that there is no inverse field for discussions on res_users, # as the test depends on it self.assertFalse(new_disc.pool.field_inverses[field]) convert = field.convert_to_write(new_disc["participants"], new_disc) self.assertEqual(convert, [(6, 0, self.env.user.ids)]) def test_40_new_inherited_fields(self): """ Test the behavior of new records with inherited fields. """ email = self.env['test_new_api.emailmessage'].new({'body': 'XXX'}) self.assertEqual(email.body, 'XXX') self.assertEqual(email.message.body, 'XXX') email.body = 'YYY' self.assertEqual(email.body, 'YYY') self.assertEqual(email.message.body, 'YYY') email.message.body = 'ZZZ' self.assertEqual(email.body, 'ZZZ') self.assertEqual(email.message.body, 'ZZZ') def test_40_new_ref_origin(self): """ Test the behavior of new records with ref/origin. """ Discussion = self.env['test_new_api.discussion'] new = Discussion.new # new records with identical/different refs xs = new() + new(ref='a') + new(ref='b') + new(ref='b') self.assertEqual([x == y for x in xs for y in xs], [ 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, ]) for x in xs: self.assertFalse(x._origin) # new records with identical/different origins a, b = Discussion.create([{'name': "A"}, {'name': "B"}]) xs = new() + new(origin=a) + new(origin=b) + new(origin=b) self.assertEqual([x == y for x in xs for y in xs], [ 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1, ]) self.assertFalse(xs[0]._origin) self.assertEqual(xs[1]._origin, a) self.assertEqual(xs[2]._origin, b) self.assertEqual(xs[3]._origin, b) self.assertEqual(xs._origin, a + b + b) self.assertEqual(xs._origin._origin, a + b + b) # new records with refs and origins x1 = new(ref='a') x2 = new(origin=b) self.assertNotEqual(x1, x2) # new discussion based on existing discussion disc = self.env.ref('test_new_api.discussion_0') new_disc = disc.new(origin=disc) self.assertFalse(new_disc.id) self.assertEqual(new_disc._origin, disc) self.assertEqual(new_disc.name, disc.name) # many2one field self.assertEqual(new_disc.moderator, disc.moderator) # one2many field self.assertTrue(new_disc.messages) self.assertNotEqual(new_disc.messages, disc.messages) self.assertEqual(new_disc.messages._origin, disc.messages) # many2many field self.assertTrue(new_disc.participants) self.assertNotEqual(new_disc.participants, disc.participants) self.assertEqual(new_disc.participants._origin, disc.participants) # provide many2one field as a dict of values; the value is a new record # with the given 'id' as origin (if given, of course) new_msg = disc.messages.new({ 'discussion': {'name': disc.name}, }) self.assertTrue(new_msg.discussion) self.assertFalse(new_msg.discussion.id) self.assertFalse(new_msg.discussion._origin) new_msg = disc.messages.new({ 'discussion': {'name': disc.name, 'id': disc.id}, }) self.assertTrue(new_msg.discussion) self.assertFalse(new_msg.discussion.id) self.assertEqual(new_msg.discussion._origin, disc) # check convert_to_write tag = self.env['test_new_api.multi.tag'].create({'name': 'Foo'}) rec = self.env['test_new_api.multi'].create({ 'lines': [(0, 0, {'tags': [(6, 0, tag.ids)]})], }) new = rec.new(origin=rec) self.assertEqual(new.lines.tags._origin, rec.lines.tags) vals = new._convert_to_write(new._cache) self.assertEqual(vals['lines'], [(6, 0, rec.lines.ids)]) def test_41_new_compute(self): """ Check recomputation of fields on new records. """ move = self.env['test_new_api.move'].create({ 'line_ids': [Command.create({'quantity': 1}), Command.create({'quantity': 1})], }) self.env.flush_all() line = move.line_ids[0] new_move = move.new(origin=move) new_line = line.new(origin=line) # move_id is fetched from origin self.assertEqual(new_line.move_id, move) self.assertEqual(new_move.quantity, 2) self.assertEqual(move.quantity, 2) # modifying new_line must trigger recomputation on new_move, even if # new_line.move_id is not new_move! new_line.quantity = 2 self.assertEqual(new_line.move_id, move) self.assertEqual(new_move.quantity, 3) self.assertEqual(move.quantity, 2) def test_41_new_one2many(self): """ Check command on one2many field on new record. """ move = self.env['test_new_api.move'].create({}) line = self.env['test_new_api.move_line'].create({'move_id': move.id, 'quantity': 1}) self.env.flush_all() new_move = move.new(origin=move) new_line = line.new(origin=line) self.assertEqual(new_move.line_ids, new_line) # drop line, and create a new one new_move.line_ids = [Command.delete(new_line.id), Command.create({'quantity': 2})] self.assertEqual(len(new_move.line_ids), 1) self.assertFalse(new_move.line_ids.id) self.assertEqual(new_move.line_ids.quantity, 2) # assign line to new move without origin new_move = move.new() new_move.line_ids = line self.assertFalse(new_move.line_ids.id) self.assertEqual(new_move.line_ids._origin, line) self.assertEqual(new_move.line_ids.move_id, new_move) def test_41_new_many2many(self): group = self.env['test_new_api.group'].create({}) user0 = self.env['test_new_api.user'].create({'group_ids': [Command.link(group.id)]}) new_user0 = user0.new(origin=user0) new_group = group.new(origin=group) self.env.invalidate_all() # creating new_user1 shoud not fetch new_group.user_ids, which is the # inverse of field new_user1.group_ids with self.assertQueryCount(0): new_user1 = self.env['test_new_api.user'].new({'group_ids': [Command.link(group.id)]}) self.assertEqual(new_user1.group_ids, new_group) # accessing new_group.user_ids should fetch group.user_ids and patch # new_group.user_ids with self.assertQueryCount(1): self.assertEqual(new_group.user_ids, new_user0 + new_user1) # creating new_user2 should patch new_group.user_ids immediately, since # it is in cache with self.assertQueryCount(0): new_user2 = self.env['test_new_api.user'].new({'group_ids': [Command.link(group.id)]}) self.assertEqual(new_user2.group_ids, new_group) self.assertEqual(new_group.user_ids, new_user0 + new_user1 + new_user2) # the patches on new_group.user_ids should not have changed group.user_ids self.assertEqual(group.user_ids, user0) @mute_logger('odoo.addons.base.models.ir_model') def test_41_new_related(self): """ test the behavior of related fields starting on new records. """ # make discussions unreadable for demo user access = self.env.ref('test_new_api.access_discussion') access.write({'perm_read': False}) # create an environment for demo user env = self.env(user=self.user_demo) self.assertEqual(env.user.login, "demo") # create a new message as demo user discussion = self.env.ref('test_new_api.discussion_0') message = env['test_new_api.message'].new({'discussion': discussion}) self.assertEqual(message.discussion, discussion) # read the related field discussion_name self.assertEqual(message.discussion.env, env) self.assertEqual(message.discussion_name, discussion.name) # DLE P75: message.discussion.name is put in the cache as sudo thanks to the computation of message.discussion_name # As we decided that now if we had the chance to access the value at some point in the code, and that it was stored in the cache # it's not a big deal to no longer raise the accesserror, as we had the chance to get the value at some point # with self.assertRaises(AccessError): # message.discussion.name @mute_logger('odoo.addons.base.models.ir_model') def test_42_new_related(self): """ test the behavior of related fields traversing new records. """ # make discussions unreadable for demo user access = self.env.ref('test_new_api.access_discussion') access.write({'perm_read': False}) # create an environment for demo user env = self.env(user=self.user_demo) self.assertEqual(env.user.login, "demo") # create a new discussion and a new message as demo user discussion = env['test_new_api.discussion'].new({'name': 'Stuff'}) message = env['test_new_api.message'].new({'discussion': discussion}) self.assertEqual(message.discussion, discussion) # read the related field discussion_name self.assertNotEqual(message.sudo().env, message.env) self.assertEqual(message.discussion_name, discussion.name) def test_43_new_related(self): """ test the behavior of one2many related fields """ partner = self.env['res.partner'].create({ 'name': 'Foo', 'child_ids': [Command.create({'name': 'Bar'})], }) multi = self.env['test_new_api.multi'].new() multi.partner = partner self.assertEqual(multi.partners.mapped('name'), ['Bar']) def test_50_defaults(self): """ test default values. """ fields = ['discussion', 'body', 'author', 'size'] defaults = self.env['test_new_api.message'].default_get(fields) self.assertEqual(defaults, {'author': self.env.uid}) defaults = self.env['test_new_api.mixed'].default_get(['number']) self.assertEqual(defaults, {'number': 3.14}) def test_50_search_many2one(self): """ test search through a path of computed fields""" messages = self.env['test_new_api.message'].search( [('author_partner.name', '=', 'Marc Demo')]) self.assertEqual(messages, self.env.ref('test_new_api.message_0_1')) def test_51_search_many2one_ordered(self): """ test search on many2one ordered by id """ with self.assertQueries([''' SELECT "test_new_api_message"."id" FROM "test_new_api_message" WHERE ("test_new_api_message"."active" = %s) ORDER BY "test_new_api_message"."discussion" ''']): self.env['test_new_api.message'].search([], order='discussion') def test_52_search_many2one_active_test(self): Model = self.env['test_new_api.model_active_field'] active_parent = Model.create({'name': 'Parent'}) child_of_active = Model.create({'parent_id': active_parent.id}) inactive_parent = Model.create({'name': 'Parent', 'active': False}) child_of_inactive = Model.create({'parent_id': inactive_parent.id}) self.assertEqual( self._search(Model, [('parent_id.name', '=', 'Parent')]), child_of_active + child_of_inactive, ) self.assertEqual( self._search(Model, [('parent_id', '=', 'Parent')]), child_of_active + child_of_inactive, ) # weird semantics: active_parent is in both results but doesn't have a parent_id self.assertEqual( self._search(Model, [('parent_id', 'child_of', active_parent.id)]), active_parent + child_of_active, ) self.assertEqual( self._search(Model, [('parent_id', 'child_of', 'Parent')]), active_parent + child_of_active + child_of_inactive, ) def test_60_one2many_domain(self): """ test the cache consistency of a one2many field with a domain """ discussion = self.env.ref('test_new_api.discussion_0') message = discussion.messages[0] self.assertNotIn(message, discussion.important_messages) message.important = True self.assertIn(message, discussion.important_messages) # writing on very_important_messages should call its domain method self.assertIn(message, discussion.very_important_messages) discussion.write({'very_important_messages': [Command.clear()]}) self.assertFalse(discussion.very_important_messages) self.assertFalse(message.exists()) def test_60_many2many_domain(self): """ test the cache consistency of a many2many field with a domain """ tag = self.env['test_new_api.multi.tag'].create({'name': 'bar'}) record = self.env['test_new_api.multi'].create({'tags': tag.ids}) self.env.flush_all() self.env.invalidate_all() self.assertEqual(type(record).tags.domain, [('name', 'ilike', 'a')]) # the tag is in the many2many self.assertIn(tag, record.tags) # modify the tag; it should not longer be in the many2many tag.name = 'foo' self.assertNotIn(tag, record.tags) # modify again the tag; it should be back in the many2many tag.name = 'baz' self.assertIn(tag, record.tags) def test_70_x2many_write(self): discussion = self.env.ref('test_new_api.discussion_0') # See YTI FIXME self.env.invalidate_all() Message = self.env['test_new_api.message'] # There must be 3 messages, 0 important self.assertEqual(len(discussion.messages), 3) self.assertEqual(len(discussion.important_messages), 0) self.assertEqual(len(discussion.very_important_messages), 0) discussion.important_messages = [Command.create({ 'body': 'What is the answer?', 'important': True, })] # There must be 4 messages, 1 important self.assertEqual(len(discussion.messages), 4) self.assertEqual(len(discussion.important_messages), 1) self.assertEqual(len(discussion.very_important_messages), 1) discussion.very_important_messages |= Message.new({ 'body': '42', 'important': True, }) # There must be 5 messages, 2 important self.assertEqual(len(discussion.messages), 5) self.assertEqual(len(discussion.important_messages), 2) self.assertEqual(len(discussion.very_important_messages), 2) def test_70_relational_inverse(self): """ Check the consistency of relational fields with inverse(s). """ discussion = self.env.ref('test_new_api.discussion_0') demo_discussion = discussion.with_user(self.user_demo) # check that the demo user sees the same messages self.assertEqual(demo_discussion.messages, discussion.messages) # See YTI FIXME self.env.flush_all() self.env.invalidate_all() # add a message as user demo messages = demo_discussion.messages message = messages.create({'discussion': discussion.id}) self.assertEqual(demo_discussion.messages, messages + message) self.assertEqual(demo_discussion.messages, discussion.messages) # add a message as superuser messages = discussion.messages message = messages.create({'discussion': discussion.id}) self.assertEqual(discussion.messages, messages + message) self.assertEqual(demo_discussion.messages, discussion.messages) def test_71_relational_inverse(self): """ Check the consistency of relational fields with inverse(s). """ move1 = self.env['test_new_api.move'].create({}) move2 = self.env['test_new_api.move'].create({}) line = self.env['test_new_api.move_line'].create({'move_id': move1.id}) self.env.flush_all() self.env.invalidate_all() line.with_context(prefetch_fields=False).move_id # Setting 'move_id' updates the one2many field that is based on it, # which has a domain. Here we check that evaluating the domain does not # accidentally override 'move_id' (by prefetch). line.move_id = move2 self.assertEqual(line.move_id, move2) def test_72_relational_inverse(self): """ Check the consistency of relational fields with inverse(s). """ move1 = self.env['test_new_api.move'].create({}) move2 = self.env['test_new_api.move'].create({}) # makes sure that line.move_id is flushed before search line = self.env['test_new_api.move_line'].create({'move_id': move1.id}) moves = self.env['test_new_api.move'].search([('line_ids', 'in', line.id)]) self.assertEqual(moves, move1) # makes sure that line.move_id is flushed before search line.move_id = move2 moves = self.env['test_new_api.move'].search([('line_ids', 'in', line.id)]) self.assertEqual(moves, move2) def test_73_relational_inverse(self): """ Check the consistency of relational fields with inverse(s). """ discussion1, discussion2 = self.env['test_new_api.discussion'].create([ {'name': "discussion1"}, {'name': "discussion2"}, ]) category1, category2 = self.env['test_new_api.category'].create([ {'name': "category1"}, {'name': "category2"}, ]) # assumption: category12 and category21 are in different order, but are # in the same order when put in a set() category12 = category1 + category2 category21 = category2 + category1 self.assertNotEqual(category12.ids, category21.ids) self.assertEqual(list(set(category12.ids)), list(set(category21.ids))) # make sure discussion1.categories is in cache; the write() below should # update the cache of discussion1.categories by appending category12.ids discussion1.categories category12.write({'discussions': [Command.link(discussion1.id)]}) self.assertEqual(discussion1.categories.ids, category12.ids) # make sure discussion2.categories is in cache; the write() below should # update the cache of discussion2.categories by appending category21.ids discussion2.categories category21.write({'discussions': [Command.link(discussion2.id)]}) self.assertEqual(discussion2.categories.ids, category21.ids) def test_80_copy(self): discussion = self.env.ref('test_new_api.discussion_0') message = self.env.ref('test_new_api.message_0_0') message1 = self.env.ref('test_new_api.message_0_1') email = self.env.ref('test_new_api.emailmessage_0_0') self.assertEqual(email.message, message) self.env['res.lang']._activate_lang('fr_FR') # set a translation for message.label email.with_context(lang='fr_FR').label = "bonjour" self.assertEqual(message.with_context(lang='fr_FR').label, 'bonjour') self.assertFalse(message1.label) # setting the parent record should not copy its translations email.copy({'message': message1.id}) self.assertEqual(message.with_context(lang='fr_FR').label, 'bonjour') self.assertFalse(message1.label) # setting a one2many should not copy translations on the lines discussion.copy({'messages': [Command.set(message1.ids)]}) self.assertEqual(message.with_context(lang='fr_FR').label, 'bonjour') self.assertFalse(message1.label) def test_85_binary_guess_zip(self): from odoo.addons.base.tests.test_mimetypes import ZIP # Regular ZIP files can be uploaded by non-admin users self.env['test_new_api.binary_svg'].with_user(self.user_demo).create({ 'name': 'Test without attachment', 'image_wo_attachment': base64.b64decode(ZIP), }) def test_86_text_base64_guess_svg(self): from odoo.addons.base.tests.test_mimetypes import SVG with self.assertRaises(UserError) as e: self.env['test_new_api.binary_svg'].with_user(self.user_demo).create({ 'name': 'Test without attachment', 'image_wo_attachment': SVG.decode("utf-8"), }) self.assertEqual(e.exception.args[0], 'Only admins can upload SVG files.') def test_90_binary_svg(self): from odoo.addons.base.tests.test_mimetypes import SVG # This should work without problems self.env['test_new_api.binary_svg'].create({ 'name': 'Test without attachment', 'image_wo_attachment': SVG, }) # And this gives error with self.assertRaises(UserError): self.env['test_new_api.binary_svg'].with_user( self.user_demo, ).create({ 'name': 'Test without attachment', 'image_wo_attachment': SVG, }) def test_91_binary_svg_attachment(self): from odoo.addons.base.tests.test_mimetypes import SVG # This doesn't neuter SVG with admin record = self.env['test_new_api.binary_svg'].create({ 'name': 'Test without attachment', 'image_attachment': SVG, }) attachment = self.env['ir.attachment'].search([ ('res_model', '=', record._name), ('res_field', '=', 'image_attachment'), ('res_id', '=', record.id), ]) self.assertEqual(attachment.mimetype, 'image/svg+xml') # ...but this should be neutered with demo user record = self.env['test_new_api.binary_svg'].with_user( self.user_demo, ).create({ 'name': 'Test without attachment', 'image_attachment': SVG, }) attachment = self.env['ir.attachment'].search([ ('res_model', '=', record._name), ('res_field', '=', 'image_attachment'), ('res_id', '=', record.id), ]) self.assertEqual(attachment.mimetype, 'text/plain') def test_92_binary_self_avatar_svg(self): from odoo.addons.base.tests.test_mimetypes import SVG demo_user = self.user_demo # User demo changes his own avatar demo_user.with_user(demo_user).image_1920 = SVG # The SVG file should have been neutered attachment = self.env['ir.attachment'].search([ ('res_model', '=', demo_user.partner_id._name), ('res_field', '=', 'image_1920'), ('res_id', '=', demo_user.partner_id.id), ]) self.assertEqual(attachment.mimetype, 'text/plain') def test_93_monetary_related(self): """ Check the currency field on related monetary fields. """ # check base field model = self.env['test_new_api.monetary_base'] field = model._fields['amount'] self.assertEqual(field.get_currency_field(model), 'base_currency_id') # related fields must use the field 'currency_id' or 'x_currency_id' model = self.env['test_new_api.monetary_related'] field = model._fields['amount'] self.assertEqual(field.related, 'monetary_id.amount') self.assertEqual(field.get_currency_field(model), 'currency_id') model = self.env['test_new_api.monetary_custom'] field = model._fields['x_amount'] self.assertEqual(field.related, 'monetary_id.amount') self.assertEqual(field.get_currency_field(model), 'x_currency_id') # inherited field must use the same field as its parent field model = self.env['test_new_api.monetary_inherits'] field = model._fields['amount'] self.assertEqual(field.related, 'monetary_id.amount') self.assertEqual(field.get_currency_field(model), 'base_currency_id') def test_94_image(self): f = io.BytesIO() Image.new('RGB', (4000, 2000), '#4169E1').save(f, 'PNG') f.seek(0) image_w = base64.b64encode(f.read()) f = io.BytesIO() Image.new('RGB', (2000, 4000), '#4169E1').save(f, 'PNG') f.seek(0) image_h = base64.b64encode(f.read()) record = self.env['test_new_api.model_image'].create({ 'name': 'image', 'image': image_w, 'image_128': image_w, }) # test create (no resize) self.assertEqual(record.image, image_w) # test create (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (128, 64)) # test create related store (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256)) # test create related no store (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128)) # test create related store on column (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32)) record.write({ 'image': image_h, 'image_128': image_h, }) # test write (no resize) self.assertEqual(record.image, image_h) # test write (resize, height limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (64, 128)) # test write related store (resize, height limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512)) # test write related no store (resize, height limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256)) # test write related store on column (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64)) record = self.env['test_new_api.model_image'].create({ 'name': 'image', 'image': image_h, 'image_128': image_h, }) # test create (no resize) self.assertEqual(record.image, image_h) # test create (resize, height limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (64, 128)) # test create related store (resize, height limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512)) # test create related no store (resize, height limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256)) # test create related store on column (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64)) record.write({ 'image': image_w, 'image_128': image_w, }) # test write (no resize) self.assertEqual(record.image, image_w) # test write (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (128, 64)) # test write related store (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256)) # test write related store (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128)) # test write related store on column (resize, width limited) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32)) # test create inverse store record = self.env['test_new_api.model_image'].create({ 'name': 'image', 'image_512': image_w, }) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (4000, 2000)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32)) # test write inverse store record.write({ 'image_512': image_h, }) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (2000, 4000)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64)) # test create inverse no store record = self.env['test_new_api.model_image'].with_context(image_no_postprocess=True).create({ 'name': 'image', 'image_256': image_w, }) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (4000, 2000)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32)) # test write inverse no store record.write({ 'image_256': image_h, }) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (2000, 4000)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64)) # test create inverse stored column record = self.env['test_new_api.model_image'].with_context(image_no_postprocess=True).create({ 'name': 'image', 'image_64': image_w, }) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (4000, 2000)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32)) # test write inverse stored column record.write({ 'image_64': image_h, }) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (2000, 4000)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256)) self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64)) # test bin_size record_bin_size = record.with_context(bin_size=True) self.assertEqual(record_bin_size.image, b'31.54 Kb') self.assertEqual(record_bin_size.image_512, b'1.02 Kb') self.assertEqual(record_bin_size.image_256, b'424.00 bytes') # non-attachment binary fields: value returned as str in a different # form, because coming from PostgreSQL instead of filestore self.assertEqual(record_bin_size.image_64, '148 bytes') # ensure image_data_uri works (value must be bytes and not string) self.assertEqual(record.image_256[:8], b'iVBORw0K') self.assertEqual(image_data_uri(record.image_256)[:30], '') # ensure invalid image raises with self.assertRaises(UserError), self.cr.savepoint(): record.write({ 'image': 'invalid image', }) # assignment of invalid image on new record does nothing, the value is # taken from origin instead (use-case: onchange) new_record = record.new(origin=record) new_record.image = '31.54 Kb' self.assertEqual(record.image, image_h) self.assertEqual(new_record.image, image_h) # assignment to new record with origin should not do any query with self.assertQueryCount(0): new_record.image = image_w def test_95_binary_bin_size_create(self): binary_value = base64.b64encode(b'content') binary_size = b'7.00 bytes' def assertBinaryValue(record, value): for field in ('binary', 'binary_related_store', 'binary_related_no_store'): self.assertEqual(record[field], value, f'Incorrect result for {field}') # created and first read without context record = self.env['test_new_api.model_binary'].create({'binary': binary_value}) record_no_bin_size = record.with_context(bin_size=False) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record, binary_value) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record_bin_size, binary_size) # created and first read with bin_size=False record_no_bin_size = self.env['test_new_api.model_binary'].with_context(bin_size=False).create({'binary': binary_value}) record = self.env['test_new_api.model_binary'].browse(record.id) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record, binary_value) assertBinaryValue(record_bin_size, binary_size) # created and first read with bin_size=True record_bin_size = self.env['test_new_api.model_binary'].with_context(bin_size=True).create({'binary': binary_value}) record = self.env['test_new_api.model_binary'].browse(record.id) record_no_bin_size = record.with_context(bin_size=False) assertBinaryValue(record_bin_size, binary_size) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record, binary_value) # created without context and flushed/invalidated with bin_size=True record = self.env['test_new_api.model_binary'].create({'binary': binary_value}) record.with_context(bin_size=True).env.invalidate_all() record_no_bin_size = record.with_context(bin_size=False) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record, binary_value) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record_bin_size, binary_size) # check computed binary field with arbitrary Python value record = self.env['test_new_api.model_binary'].create({}) record_no_bin_size = record.with_context(bin_size=False) record_bin_size = record.with_context(bin_size=True) expected_value = [(record.id, False)] self.assertEqual(record.binary_computed, expected_value) self.assertEqual(record_no_bin_size.binary_computed, expected_value) self.assertEqual(record_bin_size.binary_computed, expected_value) def test_95_binary_bin_size_write(self): binary_value = base64.b64encode(b'content') binary_size = b'7.00 bytes' def assertBinaryValue(record, value): for field in ('binary', 'binary_related_store', 'binary_related_no_store'): self.assertEqual(record[field], value, f'Incorrect result for {field}') # created and written without context record = self.env['test_new_api.model_binary'].create({}) record.write({'binary': binary_value}) record_no_bin_size = record.with_context(bin_size=False) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record, binary_value) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record_bin_size, binary_size) # created without context, written with bin_size=False record = self.env['test_new_api.model_binary'].create({}) record.with_context(bin_size=False).write({'binary': binary_value}) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record, binary_value) assertBinaryValue(record_bin_size, binary_size) # created without context, written with bin_size=True record = self.env['test_new_api.model_binary'].create({}) record.with_context(bin_size=True).write({'binary': binary_value}) record_no_bin_size = record.with_context(bin_size=False) assertBinaryValue(record_bin_size, binary_size) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record, binary_value) # created without context and flushed with bin_size=True record = self.env['test_new_api.model_binary'].create({}) record.write({'binary': binary_value}) record.with_context(bin_size=True).env.invalidate_all() record_no_bin_size = record.with_context(bin_size=False) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record, binary_value) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record_bin_size, binary_size) # created and written without context, flushed without bin_size record = self.env['test_new_api.model_binary'].create({}) record.write({'binary': binary_value}) record.env.invalidate_all() record_no_bin_size = record.with_context(bin_size=False) record_bin_size = record.with_context(bin_size=True) assertBinaryValue(record, binary_value) assertBinaryValue(record_no_bin_size, binary_value) assertBinaryValue(record_bin_size, binary_size) def test_96_order_m2o(self): belgium, congo = self.env['test_new_api.country'].create([ {'name': "Duchy of Brabant"}, {'name': "Congo"}, ]) cities = self.env['test_new_api.city'].create([ {'name': "Brussels", 'country_id': belgium.id}, {'name': "Kinshasa", 'country_id': congo.id}, ]) # cities are sorted by country_id, name self.assertEqual(cities.sorted().mapped('name'), ["Kinshasa", "Brussels"]) # change order of countries, and check sorted() belgium.name = "Belgium" self.assertEqual(cities.sorted().mapped('name'), ["Brussels", "Kinshasa"]) def test_97_ir_rule_m2m_field(self): """Ensures m2m fields can't be read if the left records can't be read. Also makes sure reading m2m doesn't take more queries than necessary.""" tag = self.env['test_new_api.multi.tag'].create({}) record = self.env['test_new_api.multi.line'].create({ 'name': 'image', 'tags': [Command.link(tag.id)], }) # only one query as admin: reading pivot table with self.assertQueryCount(1): # trick: if value is in cache, read() does not make any query record.invalidate_recordset(['tags']) record.read(['tags']) user = self.env['res.users'].create({'name': "user", 'login': "user"}) record_user = record.with_user(user) # prep the following query count by caching access check related data record_user.invalidate_recordset(['tags']) record_user.read(['tags']) # only one query as user: reading pivot table with self.assertQueryCount(1): # trick: if value is in cache, read() does not make any query record_user.invalidate_recordset(['tags']) record_user.read(['tags']) # create a passing ir.rule self.env['ir.rule'].create({ 'model_id': self.env['ir.model']._get(record._name).id, 'domain_force': "[('id', '=', %d)]" % record.id, }) # prep the following query count by caching access check related data record_user.invalidate_recordset(['tags']) record_user.read(['tags']) # still only 1 query: reading pivot table # access rules are checked in python in this case with self.assertQueryCount(1): # trick: if value is in cache, read() does not make any query record_user.invalidate_recordset(['tags']) record_user.read(['tags']) # create a blocking ir.rule self.env['ir.rule'].create({ 'model_id': self.env['ir.model']._get(record._name).id, 'domain_force': "[('id', '!=', %d)]" % record.id, }) # ensure ir.rule is applied even when reading m2m with self.assertRaises(AccessError): record_user.read(['tags']) def test_98_prefetch_translate(self): Model = self.registry['test_new_api.prefetch'] # translated '_rec_name' field should be prefetched self.assertTrue(Model.name.prefetch) # translated fields should be prefetch=True by default self.assertTrue(Model.description.prefetch) self.assertTrue(Model.html_description.prefetch) # parameter 'prefetch' can be always overridden self.assertFalse(Model.rare_description.prefetch) self.assertFalse(Model.rare_html_description.prefetch) def test_98_unlink_recompute(self): move = self.env['test_new_api.move'].create({ 'line_ids': [(0, 0, {'quantity': 42})], }) line = move.line_ids self.assertEqual(move.quantity, 42) # create an ir.rule for lines that uses move.quantity self.env['ir.rule'].create({ 'model_id': self.env['ir.model']._get(line._name).id, 'domain_force': "[('move_id.quantity', '>=', 0)]", }) # unlink the line, and check the recomputation of move.quantity user = self.user_demo line.with_user(user).unlink() self.assertEqual(move.quantity, 0) def test_99_prefetch_group(self): records = self.env['test_new_api.prefetch'].create([{} for _ in range(10)]) self.env.flush_all() self.env.invalidate_all() with self.assertQueries([""" SELECT "test_new_api_prefetch"."id", "test_new_api_prefetch"."name"->>%s, "test_new_api_prefetch"."description"->>%s, "test_new_api_prefetch"."html_description"->>%s, "test_new_api_prefetch"."create_uid", "test_new_api_prefetch"."create_date", "test_new_api_prefetch"."write_uid", "test_new_api_prefetch"."write_date" FROM "test_new_api_prefetch" WHERE ("test_new_api_prefetch"."id" IN %s) """]): records.mapped('name') # fetch all fields with prefetch=True with self.assertQueries([""" SELECT "test_new_api_prefetch"."id", "test_new_api_prefetch"."harry", "test_new_api_prefetch"."hermione", "test_new_api_prefetch"."ron" FROM "test_new_api_prefetch" WHERE ("test_new_api_prefetch"."id" IN %s) """]): records.mapped('harry') # fetch all fields with prefetch='Harry Potter' records.mapped('hermione') # fetched already records.mapped('ron') # fetched already with self.assertQueries([""" SELECT "test_new_api_prefetch"."id", "test_new_api_prefetch"."hansel", "test_new_api_prefetch"."gretel" FROM "test_new_api_prefetch" WHERE ("test_new_api_prefetch"."id" IN %s) """]): records.mapped('hansel') # fetch all fields with prefetch='Hansel and Gretel' records.mapped('gretel') # fetched already self.env.invalidate_all() with self.assertQueryCount(4): records.mapped('name') # fetch all fields with prefetch=True records.mapped('hansel') # fetch all fields with prefetch='Hansel and Gretel' records.mapped('harry') # fetch all fields with prefetch='Harry Potter' records.mapped('rare_description') # fetch that field only def test_cache_key_invalidation(self): company0 = self.env.ref('base.main_company') company1 = self.env['res.company'].create({'name': 'A'}) user0 = self.env['res.users'].create({ 'name': 'Foo', 'login': 'foo', 'company_id': company0.id, 'company_ids': [Command.set([company0.id, company1.id])], }) # this uses company0 record = self.env['test_new_api.company'].with_user(user0).create({ 'foo': 'main', }) self.assertEqual(record.env.company, company0) self.assertEqual(record.foo, 'main') # change the user's company, so we implicitly switch to company1 user0.company_id = company1 self.assertEqual(record.env.company, company1) self.assertEqual(record.foo, False) def test_field_set_prefetch(self): records = self.env['test_new_api.prefetch'].create([ {'line_ids': [Command.create({})]}, {'line_ids': [Command.create({})]}, {'line_ids': [Command.create({})]}, {'line_ids': [Command.create({})]}, ]) # This test ensures that the prefetch set is preserved when using Field.__set__(), # which calls BaseModel.write(). The prefetch set is important for write() to # ensure that method modified() can batch the fetching of relational fields. # In this case, modifying 'harry' on a record should add a related field to # recompute through the one2many field 'line_ids', which we expect to be fetched # in batch with the prefetch set. # one query for modified, one for the records, one for their lines self.env.invalidate_all() with self.assertQueryCount(3): for index, record in enumerate(records): record.harry = index + 1 # same result by calling write() directly self.env.invalidate_all() with self.assertQueryCount(3): for index, record in enumerate(records): record.write({'harry': index + 2}) class TestX2many(TransactionExpressionCase): @classmethod def setUpClass(cls): super().setUpClass() cls.user_portal = cls.env['res.users'].sudo().search([('login', '=', 'portal')]) cls.partner_portal = cls.user_portal.partner_id if not cls.user_portal: cls.env['ir.config_parameter'].sudo().set_param('auth_password_policy.minlength', 4) cls.partner_portal = cls.env['res.partner'].create({ 'name': 'Joel Willis', 'email': 'joel.willis63@example.com', }) cls.user_portal = cls.env['res.users'].with_context(no_reset_password=True).create({ 'login': 'portal', 'password': 'portal', 'partner_id': cls.partner_portal.id, 'groups_id': [Command.set([cls.env.ref('base.group_portal').id])], }) def test_definition_many2many(self): """ Test the definition of inherited many2many fields. """ field = self.env['test_new_api.multi.line']._fields['tags'] self.assertEqual(field.relation, 'test_new_api_multi_line_test_new_api_multi_tag_rel') self.assertEqual(field.column1, 'test_new_api_multi_line_id') self.assertEqual(field.column2, 'test_new_api_multi_tag_id') field = self.env['test_new_api.multi.line2']._fields['tags'] self.assertEqual(field.relation, 'test_new_api_multi_line2_test_new_api_multi_tag_rel') self.assertEqual(field.column1, 'test_new_api_multi_line2_id') self.assertEqual(field.column2, 'test_new_api_multi_tag_id') def test_10_ondelete_many2many(self): """Test A can't be deleted when used on the relation.""" record_a = self.env['test_new_api.model_a'].create({'name': 'a'}) record_b = self.env['test_new_api.model_b'].create({'name': 'b'}) record_a.write({ 'a_restricted_b_ids': [Command.set(record_b.ids)], }) with self.assertRaises(psycopg2.IntegrityError): with mute_logger('odoo.sql_db'), self.cr.savepoint(): record_a.unlink() # Test B is still cascade. record_b.unlink() self.assertFalse(record_b.exists()) def test_11_ondelete_many2many(self): """Test B can't be deleted when used on the relation.""" record_a = self.env['test_new_api.model_a'].create({'name': 'a'}) record_b = self.env['test_new_api.model_b'].create({'name': 'b'}) record_a.write({ 'b_restricted_b_ids': [Command.set(record_b.ids)], }) with self.assertRaises(psycopg2.IntegrityError): with mute_logger('odoo.sql_db'), self.cr.savepoint(): record_b.unlink() # Test A is still cascade. record_a.unlink() self.assertFalse(record_a.exists()) def test_12_active_test_one2many(self): Model = self.env['test_new_api.model_active_field'] parent = Model.create({}) self.assertFalse(parent.children_ids) # create with implicit active_test=True in context child1, child2 = Model.create([ {'parent_id': parent.id, 'active': True}, {'parent_id': parent.id, 'active': False}, ]) act_children = child1 all_children = child1 + child2 self.assertEqual(parent.children_ids, act_children) self.assertEqual(parent.with_context(active_test=True).children_ids, act_children) self.assertEqual(parent.with_context(active_test=False).children_ids, all_children) # create with active_test=False in context child3, child4 = Model.with_context(active_test=False).create([ {'parent_id': parent.id, 'active': True}, {'parent_id': parent.id, 'active': False}, ]) act_children = child1 + child3 all_children = child1 + child2 + child3 + child4 self.assertEqual(parent.children_ids, act_children) self.assertEqual(parent.with_context(active_test=True).children_ids, act_children) self.assertEqual(parent.with_context(active_test=False).children_ids, all_children) # replace active children parent.write({'children_ids': [Command.set([child1.id])]}) act_children = child1 all_children = child1 + child2 + child4 self.assertEqual(parent.children_ids, act_children) self.assertEqual(parent.with_context(active_test=True).children_ids, act_children) self.assertEqual(parent.with_context(active_test=False).children_ids, all_children) # replace all children parent.with_context(active_test=False).write({'children_ids': [Command.set([child1.id])]}) act_children = child1 all_children = child1 self.assertEqual(parent.children_ids, act_children) self.assertEqual(parent.with_context(active_test=True).children_ids, act_children) self.assertEqual(parent.with_context(active_test=False).children_ids, all_children) # check recomputation of inactive records parent.write({'children_ids': [Command.set(child4.ids)]}) self.assertTrue(child4.parent_active) parent.active = False self.assertFalse(child4.parent_active) def test_12_active_test_one2many_with_context(self): Model = self.env['test_new_api.model_active_field'] parent = Model.create({}) all_children = Model.create([ {'parent_id': parent.id, 'active': True}, {'parent_id': parent.id, 'active': False}, ]) act_children = all_children[0] self.assertEqual(parent.children_ids, act_children) self.assertEqual(parent.with_context(active_test=True).children_ids, act_children) self.assertEqual(parent.with_context(active_test=False).children_ids, all_children) self.assertEqual(parent.all_children_ids, all_children) self.assertEqual(parent.with_context(active_test=True).all_children_ids, all_children) self.assertEqual(parent.with_context(active_test=False).all_children_ids, all_children) self.assertEqual(parent.active_children_ids, act_children) self.assertEqual(parent.with_context(active_test=True).active_children_ids, act_children) self.assertEqual(parent.with_context(active_test=False).active_children_ids, act_children) # check read() self.env.invalidate_all() self.assertEqual(parent.children_ids, act_children) self.assertEqual(parent.all_children_ids, all_children) self.assertEqual(parent.active_children_ids, act_children) self.env.invalidate_all() self.assertEqual(parent.with_context(active_test=False).children_ids, all_children) self.assertEqual(parent.with_context(active_test=False).all_children_ids, all_children) self.assertEqual(parent.with_context(active_test=False).active_children_ids, act_children) def test_12_active_test_one2many_search(self): # TODO use _search, filtered domains behaves strangely for hierarchies Model = self.env['test_new_api.model_active_field'] parent = Model.create({ 'children_ids': [ Command.create({'name': 'A', 'active': True}), Command.create({'name': 'B', 'active': False}), ], }) # a one2many field without context does not match its inactive children self.assertIn(parent, Model.search([('children_ids.name', '=', 'A')])) self.assertNotIn(parent, Model.search([('children_ids.name', '=', 'B')])) # Same result when it used name_search self.assertIn(parent, Model.search([('children_ids', '=', 'A')])) self.assertNotIn(parent, Model.search([('children_ids', '=', 'B')])) # Same result with the child_of operator self.assertIn(parent, Model.search([('children_ids', 'child_of', 'A')])) self.assertNotIn(parent, Model.search([('children_ids', 'child_of', 'B')])) # a one2many field with active_test=False matches its inactive children self.assertIn(parent, Model.search([('all_children_ids.name', '=', 'A')])) self.assertIn(parent, Model.search([('all_children_ids.name', '=', 'B')])) # Same result when it used name_search self.assertIn(parent, Model.search([('all_children_ids', '=', 'A')])) # Same result with the child_of operator self.assertIn(parent, Model.search([('all_children_ids', 'child_of', 'A')])) self.assertIn(parent, Model.search([('all_children_ids', '=', 'B')])) # Same result with the child_of operator self.assertIn(parent, Model.search([('all_children_ids', 'child_of', 'A')])) self.assertIn(parent, Model.search([('all_children_ids', 'child_of', 'B')])) def test_12_active_test_many2many_search(self): # TODO use _search, filtered domains behaves strangely for hierarchies Model = self.env['test_new_api.model_active_field'] parent = Model.create({ 'relatives_ids': [ Command.create({'name': 'A', 'active': True}), Command.create({'name': 'B', 'active': False}), ], }) child_a, child_b = parent.with_context(active_test=False).relatives_ids # a many2many field without context does not match its inactive children self.assertIn(parent, Model.search([('relatives_ids.name', '=', 'A')])) self.assertNotIn(parent, Model.search([('relatives_ids.name', '=', 'B')])) # Same result when it used name_search self.assertIn(parent, Model.search([('relatives_ids', '=', 'A')])) self.assertNotIn(parent, Model.search([('relatives_ids', '=', 'B')])) # Same result with the child_of operator self.assertIn(parent, Model.search([('relatives_ids', 'child_of', child_a.id)])) self.assertIn(parent, Model.search([('relatives_ids', 'child_of', 'A')])) self.assertNotIn(parent, Model.search([('relatives_ids', 'child_of', child_b.id)])) self.assertNotIn(parent, Model.search([('relatives_ids', 'child_of', 'B')])) # a many2many field with active_test=False matches its inactive children self.assertIn(parent, Model.search([('all_relatives_ids.name', '=', 'A')])) self.assertIn(parent, Model.search([('all_relatives_ids.name', '=', 'B')])) # Same result when it used name_search self.assertIn(parent, Model.search([('all_relatives_ids', '=', 'A')])) self.assertIn(parent, Model.search([('all_relatives_ids', '=', 'B')])) # Same result with the child_of operator self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', child_a.id)])) self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', 'A')])) self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', child_b.id)])) self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', 'B')])) def test_search_many2many(self): """ Tests search on many2many fields. """ tags = self.env['test_new_api.multi.tag'] tagA = tags.create({}) tagB = tags.create({}) tagC = tags.create({}) recs = self.env['test_new_api.multi.line'] recW = recs.create({}) recX = recs.create({'tags': [Command.link(tagA.id)]}) recY = recs.create({'tags': [Command.link(tagB.id)]}) recZ = recs.create({'tags': [Command.link(tagA.id), Command.link(tagB.id)]}) recs = recW + recX + recY + recZ # test 'in' result = self._search(recs, [('tags', 'in', (tagA + tagB).ids)]) self.assertEqual(result, recX + recY + recZ) result = self._search(recs, [('tags', 'in', tagA.ids)]) self.assertEqual(result, recX + recZ) result = self._search(recs, [('tags', 'in', tagB.ids)]) self.assertEqual(result, recY + recZ) result = self._search(recs, [('tags', 'in', tagC.ids)]) self.assertEqual(result, recs.browse()) result = self._search(recs, [('tags', 'in', [])]) self.assertEqual(result, recs.browse()) # test 'not in' result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', (tagA + tagB).ids)]) self.assertEqual(result, recs - recX - recY - recZ) result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', tagA.ids)]) self.assertEqual(result, recs - recX - recZ) result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', tagB.ids)]) self.assertEqual(result, recs - recY - recZ) result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', tagC.ids)]) self.assertEqual(result, recs) result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', [])]) self.assertEqual(result, recs) # special case: compare with False result = self._search(recs, [('id', 'in', recs.ids), ('tags', '=', False)]) self.assertEqual(result, recW) result = self._search(recs, [('id', 'in', recs.ids), ('tags', '!=', False)]) self.assertEqual(result, recs - recW) def test_search_one2many(self): """ Tests search on one2many fields. """ recs = self.env['test_new_api.multi'] recX = recs.create({'lines': [Command.create({}), Command.create({})]}) recY = recs.create({'lines': [Command.create({})]}) recZ = recs.create({}) recs = recX + recY + recZ line1, line2, line3 = recs.lines line4 = recs.create({'lines': [Command.create({})]}).lines line0 = line4.create({}) # test 'in' result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', (line1 + line2 + line3 + line4).ids)]) self.assertEqual(result, recX + recY) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', (line1 + line3 + line4).ids)]) self.assertEqual(result, recX + recY) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', (line1 + line4).ids)]) self.assertEqual(result, recX) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', line4.ids)]) self.assertEqual(result, recs.browse()) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', [])]) self.assertEqual(result, recs.browse()) # test 'not in' result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line2 + line3).ids)]) self.assertEqual(result, recs - recX - recY) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line3).ids)]) self.assertEqual(result, recs - recX - recY) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', line1.ids)]) self.assertEqual(result, recs - recX) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line4).ids)]) self.assertEqual(result, recs - recX) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', line4.ids)]) self.assertEqual(result, recs) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', [])]) self.assertEqual(result, recs) # test 'not in' where the lines contain NULL values result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line0).ids)]) self.assertEqual(result, recs - recX) result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', line0.ids)]) self.assertEqual(result, recs) # special case: compare with False result = self._search(recs, [('id', 'in', recs.ids), ('lines', '=', False)]) self.assertEqual(result, recZ) result = self._search(recs, [('id', 'in', recs.ids), ('lines', '!=', False)]) self.assertEqual(result, recs - recZ) def test_create_batch_m2m(self): lines = self.env['test_new_api.multi.line'].create([{ 'tags': [Command.create({'name': str(j)}) for j in range(3)], } for i in range(3)]) self.assertEqual(len(lines), 3) for line in lines: self.assertEqual(len(line.tags), 3) def test_custom_m2m(self): model_id = self.env['ir.model']._get_id('res.partner') field = self.env['ir.model.fields'].create({ 'name': 'x_foo', 'field_description': 'Foo', 'model_id': model_id, 'ttype': 'many2many', 'relation': 'res.country', 'store': False, }) self.assertTrue(field.unlink()) def test_custom_m2m_related(self): # this checks the ondelete of a related many2many field model_id = self.env['ir.model']._get_id('res.partner') field = self.env['ir.model.fields'].create({ 'name': 'x_foo', 'field_description': 'Foo', 'model_id': model_id, 'ttype': 'many2many', 'relation': 'res.partner.category', 'related': 'category_id', 'readonly': True, 'store': True, }) self.assertTrue(field.unlink()) @mute_logger('odoo.addons.base.models.ir_model') @users('portal') def test_sudo_commands(self): """Test manipulating a x2many field using Commands with `sudo` or with another user (`with_user`) is not allowed when the destination model is flagged `_allow_sudo_commands = False` and the transaction user does not have the required access rights. This test asserts an AccessError is raised when a user attempts to pass Commands to a One2many and Many2many field targeting a model flagged with `_allow_sudo_commands = False` while using an environment with `sudo()` or `with_user(admin_user)`. The `with_user` are edge cases in some business codes, where a more-priviledged user is used temporary to perform an action, such as: - `Documents.with_user(share.create_uid)` - `request.env['sign.request'].with_user(contract.hr_responsible_id).sudo()` """ admin_user = self.env.ref('base.user_admin') my_user = self.env.user.sudo(False) # 1. one2many field `res.partner.user_ids` # Sanity checks # `res.partner` must be flagged as `_allow_sudo_commands = False` otherwise the test is pointless self.assertEqual(self.env['res.users']._allow_sudo_commands, False) # in case the type of `res.partner.user_ids` changes in a future release. # if `res.partner.user_ids` is no longer a one2many, this test must be adapted. self.assertEqual(self.env['res.partner']._fields['user_ids'].type, 'one2many') my_partner = my_user.partner_id for Partner, my_partner in [ (self.env['res.partner'].with_user(admin_user), my_partner.with_user(admin_user)), (self.env['res.partner'].sudo(), my_partner.sudo()), ]: # 1.0 Command.CREATE # Case: a public/portal user creating a new users with arbitrary values with self.assertRaisesRegex(AccessError, "not allowed to create 'User'"): Partner.create({ 'name': 'foo', 'user_ids': [Command.create({ 'login': 'foo', 'password': 'foo', })], }) # 1.1 Command.UPDATE # Case: a public/portal updating his user to add himself a group with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"): my_partner.write({ 'user_ids': [Command.update(my_partner.user_ids[0].id, { 'groups_id': [self.env.ref('base.group_system').id], })], }) # 1.2 Command.DELETE # Case: a public user deleting the public user to mess with the database with self.assertRaisesRegex(AccessError, "not allowed to delete 'User'"): my_partner.write({ 'user_ids': [Command.delete(my_partner.user_ids[0].id)], }) # 1.3 Command.UNLINK # Case: a public user unlinking the public partner and the public user to mess with the database with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"): my_partner.write({ 'user_ids': [Command.unlink(my_partner.user_ids[0].id)], }) # 1.4 Command.LINK # Case: a public/portal user changing the `partner_id` of an admin, # to change the email address of the user and ask for a reset password. with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"): my_partner.write({ 'user_ids': [Command.link(admin_user.id)], }) # 1.5 Command.CLEAR # Case: a public user unlinking the public partner and the public user just to mess with the database with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"): my_partner.write({ 'user_ids': [Command.clear()], }) # 1.6 Command.SET # Case: a public/portal user changing the `partner_id` of an admin, # to change the email address of the user and ask for a reset password. with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"): my_partner.write({ 'user_ids': [Command.set([admin_user.id])], }) # 2. many2many field `test_new_api.discussion.participants` # Sanity checks # `test_new_api.user` must be flagged as `_allow_sudo_commands = False` otherwise the test is pointless self.assertEqual(self.env['test_new_api.group']._allow_sudo_commands, False) # in case the type of `test_new_api.discussion.participants` changes in a future release. # if `test_new_api.discussion.participants` is no longer a many2many, this test must be adapted. self.assertEqual(self.env['test_new_api.user']._fields['group_ids'].type, 'many2many') public_group = self.env['test_new_api.group'].with_user(admin_user).create({ 'name': 'public' }).with_user(self.env.user) my_user = self.env['test_new_api.user'].with_user(admin_user).create({ 'name': 'foo', 'group_ids': [public_group.id], }).with_user(self.env.user) for User, my_user in [ (self.env['test_new_api.user'].with_user(admin_user), my_user.with_user(admin_user)), (self.env['test_new_api.user'].sudo(), my_user.sudo()), ]: # 2.0 Command.CREATE # Case: a public/portal user creating a new users with arbitrary values with self.assertRaisesRegex(AccessError, "not allowed to create 'test_new_api.group'"): User.create({ 'name': 'foo', 'group_ids': [Command.create({})], }) # 2.1 Command.UPDATE # Case: a public/portal updating his user to add himself a group with self.assertRaisesRegex(AccessError, "not allowed to modify 'test_new_api.group'"): my_user.write({ 'group_ids': [Command.update(my_user.group_ids[0].id, {})], }) # 2.2 Command.DELETE # Case: a public user deleting the public user to mess with the database with self.assertRaisesRegex(AccessError, "not allowed to delete 'test_new_api.group'"): my_user.write({ 'group_ids': [Command.delete(my_user.group_ids[0].id)], }) class TestHtmlField(TransactionCase): def setUp(self): super(TestHtmlField, self).setUp() self.model = self.env['test_new_api.mixed'] def test_00_sanitize(self): self.assertEqual(self.model._fields['comment1'].sanitize, False) self.assertEqual(self.model._fields['comment2'].sanitize_attributes, True) self.assertEqual(self.model._fields['comment2'].strip_classes, False) self.assertEqual(self.model._fields['comment3'].sanitize_attributes, True) self.assertEqual(self.model._fields['comment3'].strip_classes, True) some_ugly_html = """

Oops this should maybe be sanitized % if object.some_field and not object.oriented: % if object.other_field: ${object.mako_thing} This is some html. % endif %if object.dummy_field:

Youpie

%endif""" record = self.model.create({ 'comment1': some_ugly_html, 'comment2': some_ugly_html, 'comment3': some_ugly_html, 'comment4': some_ugly_html, }) self.assertEqual(record.comment1, some_ugly_html, 'Error in HTML field: content was sanitized but field has sanitize=False') self.assertIn('é@ 

' write_vals = {'comment5': val} # Once sent through `html_sanitize()` this is becoming: # `é@\xa0

` # Notice those change: # - `attr1 =` -> `attr1=` (space before `=`) # - ` attr2` -> ` attr2` (multi space -> single space) # - `=\'attr2\'` -> `="attr2"` (escaped single quote -> double quote) # - ` ` -> `\xa0` # Still, those 2 archs should be considered equals and not raise record.with_user(bypass_user).write(write_vals) # Next write shouldn't raise a sanitize right error record.with_user(internal_user).write(write_vals) # 4. Ensure our exception handling is fine val = '' write_vals = {'comment5': val} record.with_user(internal_user).write(write_vals) self.assertEqual(record.comment5, '', "should be sanitized (not in groups)") # extra test with new record having 'record' as origin new_record = record.new(origin=record) new_record.with_user(bypass_user).comment5 # this was causing an infinite recursion (see explanation in fields.py) new_record.invalidate_recordset() new_record.with_user(internal_user).comment5 @patch('odoo.fields.html_sanitize', return_value='

comment

') def test_onchange_sanitize(self, patch): self.assertTrue(self.registry['test_new_api.mixed'].comment2.sanitize) record = self.env['test_new_api.mixed'].create({ 'comment2': '

comment

', }) # in a perfect world this should be 1, but at the moment the value is # sanitized more than once during creation of the record self.assertEqual(patch.call_count, 2) # new value needs to be validated, so it is sanitized once more record.comment2 = '

comment

' self.assertEqual(patch.call_count, 3) # the value is already sanitized for flushing record.flush_recordset() self.assertEqual(patch.call_count, 3) # value coming from db does not need to be sanitized record.invalidate_recordset() record.comment2 self.assertEqual(patch.call_count, 3) # value coming from db during an onchange does not need to be sanitized new_record = record.new(origin=record) new_record.comment2 self.assertEqual(patch.call_count, 3) class TestMagicFields(TransactionCase): def test_write_date(self): record = self.env['test_new_api.discussion'].create({'name': 'Booba'}) self.assertEqual(record.create_uid, self.env.user) self.assertEqual(record.write_uid, self.env.user) def test_mro_mixin(self): # Mixin # | # | # | # ExtendedDisplay 'test_new_api.mixin' Display 'base' # | | | | # +----------------------+-+--------------+---------+ # | # 'test_new_api.display' # # The field 'display_name' is defined as store=True on the class Display # above. The field 'display_name' on the model 'test_new_api.mixin' is # expected to be automatic and non-stored. But the field 'display_name' # on the model 'test_new_api.display' should not be automatic: it must # correspond to the definition given in class Display, even if the MRO # of the model shows the automatic field on the mixin model before the # actual definition. registry = self.env.registry models = registry.models # check setup of models in alphanumeric order self.patch(registry, 'models', OrderedDict(sorted(models.items()))) registry.setup_models(self.cr) field = registry['test_new_api.display'].display_name self.assertFalse(field.automatic) self.assertTrue(field.store) # check setup of models in reverse alphanumeric order self.patch(registry, 'models', OrderedDict(sorted(models.items(), reverse=True))) registry.setup_models(self.cr) field = registry['test_new_api.display'].display_name self.assertFalse(field.automatic) self.assertTrue(field.store) class TestParentStore(TransactionCase): def setUp(self): super(TestParentStore, self).setUp() # make a tree of categories: # 0 # /|\ # 1 2 3 # /|\ # 4 5 6 # /|\ # 7 8 9 Cat = self.env['test_new_api.category'] cat0 = Cat.create({'name': '0'}) cat1 = Cat.create({'name': '1', 'parent': cat0.id}) cat2 = Cat.create({'name': '2', 'parent': cat0.id}) cat3 = Cat.create({'name': '3', 'parent': cat0.id}) cat4 = Cat.create({'name': '4', 'parent': cat3.id}) cat5 = Cat.create({'name': '5', 'parent': cat3.id}) cat6 = Cat.create({'name': '6', 'parent': cat3.id}) cat7 = Cat.create({'name': '7', 'parent': cat6.id}) cat8 = Cat.create({'name': '8', 'parent': cat6.id}) cat9 = Cat.create({'name': '9', 'parent': cat6.id}) self._cats = Cat.concat(cat0, cat1, cat2, cat3, cat4, cat5, cat6, cat7, cat8, cat9) def cats(self, *indexes): """ Return the given categories. """ ids = self._cats.ids return self._cats.browse([ids[index] for index in indexes]) def assertChildOf(self, category, children): self.assertEqual(category.search([('id', 'child_of', category.ids)]), children) def assertParentOf(self, category, parents): self.assertEqual(category.search([('id', 'parent_of', category.ids)]), parents) def test_base(self): """ Check the initial tree structure. """ self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(1), self.cats(1)) self.assertChildOf(self.cats(2), self.cats(2)) self.assertChildOf(self.cats(3), self.cats(3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(4), self.cats(4)) self.assertChildOf(self.cats(5), self.cats(5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertChildOf(self.cats(7), self.cats(7)) self.assertChildOf(self.cats(8), self.cats(8)) self.assertChildOf(self.cats(9), self.cats(9)) self.assertParentOf(self.cats(0), self.cats(0)) self.assertParentOf(self.cats(1), self.cats(0, 1)) self.assertParentOf(self.cats(2), self.cats(0, 2)) self.assertParentOf(self.cats(3), self.cats(0, 3)) self.assertParentOf(self.cats(4), self.cats(0, 3, 4)) self.assertParentOf(self.cats(5), self.cats(0, 3, 5)) self.assertParentOf(self.cats(6), self.cats(0, 3, 6)) self.assertParentOf(self.cats(7), self.cats(0, 3, 6, 7)) self.assertParentOf(self.cats(8), self.cats(0, 3, 6, 8)) self.assertParentOf(self.cats(9), self.cats(0, 3, 6, 9)) def test_base_compute(self): """ Check the tree structure after computation from scratch. """ self.cats()._parent_store_compute() self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(1), self.cats(1)) self.assertChildOf(self.cats(2), self.cats(2)) self.assertChildOf(self.cats(3), self.cats(3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(4), self.cats(4)) self.assertChildOf(self.cats(5), self.cats(5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertChildOf(self.cats(7), self.cats(7)) self.assertChildOf(self.cats(8), self.cats(8)) self.assertChildOf(self.cats(9), self.cats(9)) self.assertParentOf(self.cats(0), self.cats(0)) self.assertParentOf(self.cats(1), self.cats(0, 1)) self.assertParentOf(self.cats(2), self.cats(0, 2)) self.assertParentOf(self.cats(3), self.cats(0, 3)) self.assertParentOf(self.cats(4), self.cats(0, 3, 4)) self.assertParentOf(self.cats(5), self.cats(0, 3, 5)) self.assertParentOf(self.cats(6), self.cats(0, 3, 6)) self.assertParentOf(self.cats(7), self.cats(0, 3, 6, 7)) self.assertParentOf(self.cats(8), self.cats(0, 3, 6, 8)) self.assertParentOf(self.cats(9), self.cats(0, 3, 6, 9)) def test_delete(self): """ Delete a node. """ self.cats(6).unlink() self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5)) self.assertChildOf(self.cats(3), self.cats(3, 4, 5)) self.assertChildOf(self.cats(5), self.cats(5)) self.assertParentOf(self.cats(0), self.cats(0)) self.assertParentOf(self.cats(3), self.cats(0, 3)) self.assertParentOf(self.cats(5), self.cats(0, 3, 5)) def test_move_1_0(self): """ Move a node to a root position. """ self.cats(6).write({'parent': False}) self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5)) self.assertChildOf(self.cats(3), self.cats(3, 4, 5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertParentOf(self.cats(9), self.cats(6, 9)) def test_move_1_1(self): """ Move a node into an empty subtree. """ self.cats(6).write({'parent': self.cats(1).id}) self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(1), self.cats(1, 6, 7, 8, 9)) self.assertChildOf(self.cats(3), self.cats(3, 4, 5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertParentOf(self.cats(9), self.cats(0, 1, 6, 9)) def test_move_1_N(self): """ Move a node into a non-empty subtree. """ self.cats(6).write({'parent': self.cats(0).id}) self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(3), self.cats(3, 4, 5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertParentOf(self.cats(9), self.cats(0, 6, 9)) def test_move_N_0(self): """ Move multiple nodes to root position. """ self.cats(5, 6).write({'parent': False}) self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4)) self.assertChildOf(self.cats(3), self.cats(3, 4)) self.assertChildOf(self.cats(5), self.cats(5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertParentOf(self.cats(5), self.cats(5)) self.assertParentOf(self.cats(9), self.cats(6, 9)) def test_move_N_1(self): """ Move multiple nodes to an empty subtree. """ self.cats(5, 6).write({'parent': self.cats(1).id}) self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(1), self.cats(1, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(3), self.cats(3, 4)) self.assertChildOf(self.cats(5), self.cats(5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertParentOf(self.cats(5), self.cats(0, 1, 5)) self.assertParentOf(self.cats(9), self.cats(0, 1, 6, 9)) def test_move_N_N(self): """ Move multiple nodes to a non- empty subtree. """ self.cats(5, 6).write({'parent': self.cats(0).id}) self.assertChildOf(self.cats(0), self.cats(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) self.assertChildOf(self.cats(3), self.cats(3, 4)) self.assertChildOf(self.cats(5), self.cats(5)) self.assertChildOf(self.cats(6), self.cats(6, 7, 8, 9)) self.assertParentOf(self.cats(5), self.cats(0, 5)) self.assertParentOf(self.cats(9), self.cats(0, 6, 9)) def test_move_1_cycle(self): """ Move a node to create a cycle. """ with self.assertRaises(UserError): self.cats(3).write({'parent': self.cats(9).id}) def test_move_N_cycle(self): """ Move multiple nodes to create a cycle. """ with self.assertRaises(UserError): self.cats(1, 3).write({'parent': self.cats(9).id}) def test_compute_depend_parent_path(self): self.assertEqual(self.cats(7).depth, 3) self.assertEqual(self.cats(8).depth, 3) self.assertEqual(self.cats(9).depth, 3) # change parent of node to have 2 parents self.cats(7).parent = self.cats(2) self.assertEqual(self.cats(7).depth, 2) # change parent of node to root self.cats(7).parent = False self.assertEqual(self.cats(7).depth, 0) # change grand-parent of nodes self.cats(6).parent = self.cats(0) self.assertEqual(self.cats(8).depth, 2) self.assertEqual(self.cats(9).depth, 2) # add a new node: one query to INSERT, one query to UPDATE parent_path with self.assertQueryCount(2): cat = self.cats().create({'name': '10', 'parent': self.cats(6).id}) self.assertEqual(cat.depth, 2) class TestRequiredMany2one(TransactionCase): def test_explicit_ondelete(self): field = self.env['test_new_api.req_m2o']._fields['foo'] self.assertEqual(field.ondelete, 'cascade') def test_implicit_ondelete(self): field = self.env['test_new_api.req_m2o']._fields['bar'] self.assertEqual(field.ondelete, 'restrict') def test_explicit_set_null(self): Model = self.env['test_new_api.req_m2o'] field = Model._fields['foo'] # clean up registry after this test self.addCleanup(self.registry.reset_changes) self.patch(field, 'ondelete', 'set null') with self.assertRaises(ValueError): field.setup_nonrelated(Model) class TestRequiredMany2oneTransient(TransactionCase): def test_explicit_ondelete(self): field = self.env['test_new_api.req_m2o_transient']._fields['foo'] self.assertEqual(field.ondelete, 'restrict') def test_implicit_ondelete(self): field = self.env['test_new_api.req_m2o_transient']._fields['bar'] self.assertEqual(field.ondelete, 'cascade') def test_explicit_set_null(self): Model = self.env['test_new_api.req_m2o_transient'] field = Model._fields['foo'] # clean up registry after this test self.addCleanup(self.registry.reset_changes) self.patch(field, 'ondelete', 'set null') with self.assertRaises(ValueError): field.setup_nonrelated(Model) @tagged('m2oref') class TestMany2oneReference(TransactionExpressionCase): def test_delete_m2o_reference_records(self): m = self.env['test_new_api.model_many2one_reference'] self.env.cr.execute("SELECT max(id) FROM test_new_api_model_many2one_reference") ids = self.env.cr.fetchone() # fake record to emulate the unlink of a non-existant record foo = m.browse(1 if not ids[0] else (ids[0] + 1)) self.assertTrue(foo.unlink()) def test_search_inverse_one2many_autojoin(self): record = self.env['test_new_api.inverse_m2o_ref'].create({}) # the one2many field 'model_ids' should be auto_join=True self.patch(type(record).model_ids, 'auto_join', True) # create a reference to record reference = self.env['test_new_api.model_many2one_reference'].create({'res_id': record.id}) reference.res_model = record._name # the model field 'res_model' is not in database yet self.assertTrue(self.env.cache.has_dirty_fields(reference, [type(reference).res_model])) # searching on the one2many should flush the field 'res_model' records = record.search([('model_ids.create_date', '!=', False)]) self.assertIn(record, records) @tagged('selection_abstract') class TestSelectionDeleteUpdate(TransactionCase): MODEL_ABSTRACT = 'test_new_api.state_mixin' def setUp(self): super().setUp() # enable unlinking ir.model.fields.selection self.patch(self.registry, 'ready', False) def test_unlink_asbtract(self): self.env['ir.model.fields.selection'].search([ ('field_id.model', '=', self.MODEL_ABSTRACT), ('field_id.name', '=', 'state'), ('value', '=', 'confirmed'), ], limit=1).unlink() @tagged('selection_update_base') class TestSelectionUpdates(TransactionCase): MODEL_BASE = 'test_new_api.model_selection_base' MODEL_RELATED = 'test_new_api.model_selection_related' MODEL_RELATED_UPDATE = 'test_new_api.model_selection_related_updatable' @classmethod def setUpClass(cls): super().setUpClass() # Specifying a lang in env/context should not increase query counts # of CRUD operations cls.env = cls.env(context={'lang': 'en_US'}) def test_selection(self): self.env[self.MODEL_BASE].create({}) # warming up with self.assertQueryCount(1): self.env[self.MODEL_BASE].create({}) with self.assertQueryCount(1): record = self.env[self.MODEL_BASE].create({'my_selection': 'foo'}) with self.assertQueryCount(1): record.my_selection = 'bar' def test_selection_related_readonly(self): related_record = self.env[self.MODEL_BASE].create({'my_selection': 'foo'}) with self.assertQueryCount(2): # defaults (readonly related field), INSERT record = self.env[self.MODEL_RELATED].create({'selection_id': related_record.id}) with self.assertQueryCount(0): record.related_selection = 'bar' def test_selection_related(self): related_record = self.env[self.MODEL_BASE].create({'my_selection': 'foo'}) with self.assertQueryCount(2): # defaults (related field), INSERT record = self.env[self.MODEL_RELATED_UPDATE].create({'selection_id': related_record.id}) with self.assertQueryCount(2): record.related_selection = 'bar' @tagged('selection_ondelete_base') class TestSelectionOndelete(TransactionCase): MODEL_BASE = 'test_new_api.model_selection_base' MODEL_REQUIRED = 'test_new_api.model_selection_required' MODEL_NONSTORED = 'test_new_api.model_selection_non_stored' MODEL_WRITE_OVERRIDE = 'test_new_api.model_selection_required_for_write_override' def setUp(self): super().setUp() # enable unlinking ir.model.fields.selection self.patch(self.registry, 'ready', False) def _unlink_option(self, model, option): self.env['ir.model.fields.selection'].search([ ('field_id.model', '=', model), ('field_id.name', '=', 'my_selection'), ('value', '=', option), ], limit=1).unlink() def test_ondelete_default(self): # create some records, one of which having the extended selection option rec1 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'foo'}) rec2 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'bar'}) rec3 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'baz'}) # test that all values are correct before the removal of the value self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'baz') # unlink the extended option (simulates a module uninstall) self._unlink_option(self.MODEL_REQUIRED, 'baz') # verify that the ondelete policy has succesfully been applied self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'foo') # reset to default def test_ondelete_base_null_explicit(self): rec1 = self.env[self.MODEL_BASE].create({'my_selection': 'foo'}) rec2 = self.env[self.MODEL_BASE].create({'my_selection': 'bar'}) rec3 = self.env[self.MODEL_BASE].create({'my_selection': 'quux'}) self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'quux') self._unlink_option(self.MODEL_BASE, 'quux') self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertFalse(rec3.my_selection) def test_ondelete_base_null_implicit(self): rec1 = self.env[self.MODEL_BASE].create({'my_selection': 'foo'}) rec2 = self.env[self.MODEL_BASE].create({'my_selection': 'bar'}) rec3 = self.env[self.MODEL_BASE].create({'my_selection': 'ham'}) self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'ham') self._unlink_option(self.MODEL_BASE, 'ham') self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertFalse(rec3.my_selection) def test_ondelete_cascade(self): rec1 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'foo'}) rec2 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'bar'}) rec3 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'eggs'}) self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'eggs') self._unlink_option(self.MODEL_REQUIRED, 'eggs') self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertFalse(rec3.exists()) def test_ondelete_literal(self): rec1 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'foo'}) rec2 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'bar'}) rec3 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'bacon'}) self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'bacon') self._unlink_option(self.MODEL_REQUIRED, 'bacon') self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'bar') def test_ondelete_multiple_explicit(self): rec1 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'foo'}) rec2 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'eevee'}) rec3 = self.env[self.MODEL_REQUIRED].create({'my_selection': 'pikachu'}) self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'eevee') self.assertEqual(rec3.my_selection, 'pikachu') self._unlink_option(self.MODEL_REQUIRED, 'eevee') self._unlink_option(self.MODEL_REQUIRED, 'pikachu') self.assertEqual(rec1.my_selection, 'foo') self.assertEqual(rec2.my_selection, 'bar') self.assertEqual(rec3.my_selection, 'foo') def test_ondelete_callback(self): rec = self.env[self.MODEL_REQUIRED].create({'my_selection': 'knickers'}) self.assertEqual(rec.my_selection, 'knickers') self._unlink_option(self.MODEL_REQUIRED, 'knickers') self.assertEqual(rec.my_selection, 'foo') self.assertFalse(rec.active) def test_non_stored_selection(self): rec = self.env[self.MODEL_NONSTORED].create({}) rec.my_selection = 'foo' self.assertEqual(rec.my_selection, 'foo') self._unlink_option(self.MODEL_NONSTORED, 'foo') self.assertFalse(rec.my_selection) def test_required_base_selection_field(self): # test that no ondelete action is executed on a required selection field that is not # extended, only required fields that extend it with selection_add should # have ondelete actions defined rec = self.env[self.MODEL_REQUIRED].create({'my_selection': 'foo'}) self.assertEqual(rec.my_selection, 'foo') self._unlink_option(self.MODEL_REQUIRED, 'foo') self.assertEqual(rec.my_selection, 'foo') @mute_logger('odoo.addons.base.models.ir_model') def test_write_override_selection(self): # test that on override to write that raises an error does not prevent the ondelete # policy from executing and cleaning up what needs to be cleaned up rec = self.env[self.MODEL_WRITE_OVERRIDE].create({'my_selection': 'divinity'}) self.assertEqual(rec.my_selection, 'divinity') self._unlink_option(self.MODEL_WRITE_OVERRIDE, 'divinity') self.assertEqual(rec.my_selection, 'foo') @tagged('selection_ondelete_advanced') class TestSelectionOndeleteAdvanced(TransactionCase): MODEL_BASE = 'test_new_api.model_selection_base' MODEL_REQUIRED = 'test_new_api.model_selection_required' def setUp(self): super().setUp() # necessary cleanup for resetting changes in the registry for model_name in (self.MODEL_BASE, self.MODEL_REQUIRED): Model = self.registry[model_name] self.addCleanup(setattr, Model, '_BaseModel__base_classes', Model._BaseModel__base_classes) def test_ondelete_unexisting_policy(self): class Foo(models.Model): _module = None _inherit = self.MODEL_REQUIRED my_selection = fields.Selection(selection_add=[ ('random', "Random stuff"), ], ondelete={'random': 'poop'}) Foo._build_model(self.registry, self.env.cr) with self.assertRaises(ValueError): self.registry.setup_models(self.env.cr) def test_ondelete_default_no_default(self): class Foo(models.Model): _module = None _inherit = self.MODEL_BASE my_selection = fields.Selection(selection_add=[ ('corona', "Corona beers suck"), ], ondelete={'corona': 'set default'}) Foo._build_model(self.registry, self.env.cr) with self.assertRaises(AssertionError): self.registry.setup_models(self.env.cr) def test_ondelete_value_no_valid(self): class Foo(models.Model): _module = None _inherit = self.MODEL_BASE my_selection = fields.Selection(selection_add=[ ('westvleteren', "Westvleteren beers is overrated"), ], ondelete={'westvleteren': 'set foooo'}) Foo._build_model(self.registry, self.env.cr) with self.assertRaises(AssertionError): self.registry.setup_models(self.env.cr) def test_ondelete_required_null_explicit(self): class Foo(models.Model): _module = None _inherit = self.MODEL_REQUIRED my_selection = fields.Selection(selection_add=[ ('brap', "Brap"), ], ondelete={'brap': 'set null'}) Foo._build_model(self.registry, self.env.cr) with self.assertRaises(ValueError): self.registry.setup_models(self.env.cr) def test_ondelete_required_null_implicit(self): class Foo(models.Model): _module = None _inherit = self.MODEL_REQUIRED my_selection = fields.Selection(selection_add=[ ('boing', "Boyoyoyoing"), ]) Foo._build_model(self.registry, self.env.cr) with self.assertRaises(ValueError): self.registry.setup_models(self.env.cr) class TestFieldParametersValidation(TransactionCase): def test_invalid_parameter(self): class Foo(models.Model): _module = None _name = _description = 'test_new_api.field_parameter_validation' name = fields.Char(invalid_parameter=42) Foo._build_model(self.registry, self.env.cr) self.addCleanup(self.registry.__delitem__, Foo._name) with self.assertLogs('odoo.fields', level='WARNING') as cm: self.registry.setup_models(self.env.cr) self.assertTrue(cm.output[0].startswith( "WARNING:odoo.fields:Field test_new_api.field_parameter_validation.name: " "unknown parameter 'invalid_parameter'" )) def select(model, *fnames): """ Return the expected query string to SELECT the given columns. """ table = model._table model_fields = model._fields terms = ", ".join( f'"{table}"."{fname}"' if not model_fields[fname].translate else f'"{table}"."{fname}"->>%s' for fname in ['id'] + list(fnames) ) return f'SELECT {terms} FROM "{table}" WHERE ("{table}"."id" IN %s)' def insert(model, *fnames, rowcount=1): """ Return the expected query string to INSERT the given columns. """ columns = sorted(fnames + ('create_uid', 'create_date', 'write_uid', 'write_date')) header = ", ".join(f'"{column}"' for column in columns) template = ", ".join("%s" for _index in range(rowcount)) return f'INSERT INTO "{model._table}" ({header}) VALUES {template} RETURNING "id"' def update(model, *fnames): """ Return the expected query string to UPDATE the given columns. """ table = f'"{model._table}"' fnames = sorted(fnames + ('write_uid', 'write_date')) columns = ", ".join(f'"{column}"' for column in fnames) assignments = ", ".join( f'"{fname}" = "__tmp"."{fname}"::{model._fields[fname].column_type[1]}' for fname in fnames ) return ( f'UPDATE {table} SET {assignments} ' f'FROM (VALUES %s) AS "__tmp"("id", {columns}) ' f'WHERE {table}."id" = "__tmp"."id"' ) class TestComputeQueries(TransactionCase): """ Test the queries made by create() with computed fields. """ def test_compute_readonly(self): model = self.env['test_new_api.compute.readonly'] model.create({}) # no value, no default with self.assertQueries([insert(model, 'foo'), update(model, 'bar')]): record = model.create({'foo': 'Foo'}) self.assertEqual(record.bar, 'Foo') # some value, no default with self.assertQueries([insert(model, 'foo', 'bar'), update(model, 'bar')]): record = model.create({'foo': 'Foo', 'bar': 'Bar'}) self.assertEqual(record.bar, 'Foo') model = model.with_context(default_bar='Def') # no value, some default with self.assertQueries([insert(model, 'foo', 'bar'), update(model, 'bar')]): record = model.create({'foo': 'Foo'}) self.assertEqual(record.bar, 'Foo') # some value, some default with self.assertQueries([insert(model, 'foo', 'bar'), update(model, 'bar')]): record = model.create({'foo': 'Foo', 'bar': 'Bar'}) self.assertEqual(record.bar, 'Foo') def test_compute_readwrite(self): model = self.env['test_new_api.compute.readwrite'] model.create({}) # no value, no default with self.assertQueries([insert(model, 'foo'), update(model, 'bar')]): record = model.create({'foo': 'Foo'}) self.assertEqual(record.bar, 'Foo') # some value, no default with self.assertQueries([insert(model, 'foo', 'bar')]): record = model.create({'foo': 'Foo', 'bar': 'Bar'}) self.assertEqual(record.bar, 'Bar') model = model.with_context(default_bar='Def') # no value, some default with self.assertQueries([insert(model, 'foo', 'bar')]): record = model.create({'foo': 'Foo'}) self.assertEqual(record.bar, 'Def') # some value, some default with self.assertQueries([insert(model, 'foo', 'bar')]): record = model.create({'foo': 'Foo', 'bar': 'Bar'}) self.assertEqual(record.bar, 'Bar') def test_compute_inverse(self): model = self.env['test_new_api.compute.inverse'] model.create({}) # no value, no default with self.assertQueries([insert(model, 'foo'), update(model, 'bar')]): record = model.create({'foo': 'Foo'}) self.assertEqual(record.foo, 'Foo') self.assertEqual(record.bar, 'Foo') # some value, no default with self.assertQueries([insert(model, 'foo', 'bar'), update(model, 'foo')]): record = model.create({'foo': 'Foo', 'bar': 'Bar'}) self.assertEqual(record.foo, 'Bar') self.assertEqual(record.bar, 'Bar') model = model.with_context(default_bar='Def') # no value, some default with self.assertQueries([insert(model, 'foo', 'bar'), update(model, 'foo')]): record = model.create({'foo': 'Foo'}) self.assertEqual(record.foo, 'Def') self.assertEqual(record.bar, 'Def') # some value, some default with self.assertQueries([insert(model, 'foo', 'bar'), update(model, 'foo')]): record = model.create({'foo': 'Foo', 'bar': 'Bar'}) self.assertEqual(record.foo, 'Bar') self.assertEqual(record.bar, 'Bar') def test_multi_create(self): model = self.env['test_new_api.foo'] model.create({}) with self.assertQueries([insert(model, 'name', 'value1', 'value2', rowcount=4)]): create_values = [ {'name': 'Foo1', 'value1': 10}, {'name': 'Foo2', 'value2': 12}, {'name': 'Foo3'}, {}, ] records = model.create(create_values) self.assertEqual(records.mapped('name'), ['Foo1', 'Foo2', 'Foo3', False]) self.assertEqual(records.mapped('value1'), [10, 0, 0, 0]) self.assertEqual(records.mapped('value2'), [0, 12, 0, 0]) def test_partial_compute_batching(self): """ Create several 'new' records and check that the partial compute method is called only once. """ order = self.env['test_new_api.order'].new({ 'line_ids': [Command.create({'reward': False})] * 100, }) OrderLine = self.env.registry['test_new_api.order.line'] with patch.object( OrderLine, '_compute_has_been_rewarded', side_effect=OrderLine._compute_has_been_rewarded, autospec=True, ) as patch_compute: order.line_ids.mapped('has_been_rewarded') self.assertEqual(patch_compute.call_count, 1) class TestComputeSudo(TransactionCaseWithUserDemo): def test_compute_sudo_depends_context_uid(self): record = self.env['test_new_api.compute.sudo'].create({}) self.assertEqual(record.with_user(self.user_demo).name_for_uid, self.user_demo.name) class test_shared_cache(TransactionCaseWithUserDemo): def test_shared_cache_computed_field(self): # Test case: Check that the shared cache is not used if a compute_sudo stored field # is computed IF there is an ir.rule defined on this specific model. # Real life example: # A user can only see its own timesheets on a task, but the field "Planned Hours", # which is stored-compute_sudo, should take all the timesheet lines into account # However, when adding a new line and then recomputing the value, no existing line # from another user is binded on self, then the value is erased and saved on the # database. task = self.env['test_new_api.model_shared_cache_compute_parent'].create({ 'name': 'Shared Task'}) self.env['test_new_api.model_shared_cache_compute_line'].create({ 'user_id': self.env.ref('base.user_admin').id, 'parent_id': task.id, 'amount': 1, }) self.assertEqual(task.total_amount, 1) self.env.flush_all() self.env.invalidate_all() # Start fresh, as it would be the case on 2 different sessions. task = task.with_user(self.user_demo) with Form(task) as task_form: # Use demo has no access to the already existing line self.assertEqual(len(task_form.line_ids), 0) # But see the real total_amount self.assertEqual(task_form.total_amount, 1) # Now let's add a new line (and retrigger the compute method) with task_form.line_ids.new() as line: line.amount = 2 # The new value for total_amount, should be 3, not 2. self.assertEqual(task_form.total_amount, 2) @tagged('unlink_constraints') class TestUnlinkConstraints(TransactionCase): @classmethod def setUpClass(cls): super().setUpClass() MODEL = cls.env['test_new_api.model_constrained_unlinks'] cls.deletable_bar = MODEL.create({'bar': 5}) cls.undeletable_bar = MODEL.create({'bar': 6}) cls.deletable_foo = MODEL.create({'foo': 'formaggio'}) cls.undeletable_foo = MODEL.create({'foo': 'prosciutto'}) from odoo.addons.base.models.ir_model import MODULE_UNINSTALL_FLAG uninstall = {MODULE_UNINSTALL_FLAG: True} cls.undeletable_bar_uninstall = cls.undeletable_bar.with_context(**uninstall) cls.undeletable_foo_uninstall = cls.undeletable_foo.with_context(**uninstall) def test_unlink_constraint_manual_bar(self): self.assertTrue(self.deletable_bar.unlink()) with self.assertRaises(ValueError, msg="Nooooooooo bar can't be greater than five!!"): self.undeletable_bar.unlink() def test_unlink_constraint_uninstall_bar(self): self.assertTrue(self.deletable_bar.unlink()) # should succeed since it's at_uninstall=False self.assertTrue(self.undeletable_bar_uninstall.unlink()) def test_unlink_constraint_manual_foo(self): self.assertTrue(self.deletable_foo.unlink()) with self.assertRaises(ValueError, msg="You didn't say if you wanted it crudo or cotto..."): self.undeletable_foo.unlink() def test_unlink_constraint_uninstall_foo(self): self.assertTrue(self.deletable_foo) # should fail since it's at_uninstall=True with self.assertRaises(ValueError, msg="You didn't say if you wanted it crudo or cotto..."): self.undeletable_foo_uninstall.unlink() @tagged('wrong_related_path') class TestWrongRelatedError(TransactionCase): def test_wrong_related_path(self): class Foo(models.Model): _module = None _name = _description = 'test_new_api.wrong_related_path' foo_id = fields.Many2one('test_new_api.foo') foo_non_existing = fields.Char(related='foo_id.non_existing_field') Foo._build_model(self.registry, self.env.cr) self.addCleanup(self.registry.__delitem__, Foo._name) errMsg = ( "Field non_existing_field referenced in related field definition " "test_new_api.wrong_related_path.foo_non_existing does not exist." ) with self.assertRaisesRegex(KeyError, errMsg): self.registry.setup_models(self.env.cr) class TestPrecomputeModel(TransactionCase): def test_precompute_consistency(self): Model = self.registry['test_new_api.precompute'] self.assertEqual(Model.lower.compute, Model.upper.compute) self.assertTrue(Model.lower.precompute) self.assertTrue(Model.upper.precompute) # see what happens if not both are precompute self.addCleanup(self.registry.reset_changes) self.patch(Model.upper, 'precompute', False) with self.assertWarns(UserWarning): self.registry.setup_models(self.cr) self.registry.field_computed def test_precompute_dependencies_base(self): Model = self.registry['test_new_api.precompute'] self.assertTrue(Model.lower.precompute) self.assertTrue(Model.upper.precompute) self.assertTrue(Model.lowup.precompute) # see what happens if precompute depends on non-precompute self.addCleanup(self.registry.reset_changes) def reset(): Model.lowup.precompute = True self.addCleanup(reset) self.patch(Model.lower, 'precompute', False) self.patch(Model.upper, 'precompute', False) with self.assertWarns(UserWarning): self.registry.setup_models(self.cr) self.registry.get_trigger_tree(Model._fields.values()) def test_precompute_dependencies_many2one(self): Model = self.registry['test_new_api.precompute'] Partner = self.registry['res.partner'] # Model.commercial_id depends on partner_id.commercial_partner_id, and # precomputation is valid when traversing many2one fields self.assertTrue(Model.commercial_id.precompute) self.assertFalse(Partner.commercial_partner_id.precompute) def test_precompute_dependencies_one2many(self): Model = self.registry['test_new_api.precompute'] Line = self.registry['test_new_api.precompute.line'] self.assertTrue(Model.size.precompute) self.assertTrue(Line.size.precompute) # see what happens if precompute depends on non-precompute self.addCleanup(self.registry.reset_changes) # ensure that Model.size.precompute is restored after setup_models() self.patch(Model.size, 'precompute', True) self.patch(Line.size, 'precompute', False) with self.assertWarns(UserWarning): self.registry.setup_models(self.cr) self.registry.get_trigger_tree(Model._fields.values()) class TestPrecompute(TransactionCase): def test_precompute(self): model = self.env['test_new_api.precompute'] Model = self.registry['test_new_api.precompute'] self.assertTrue(Model.lower.precompute) self.assertTrue(Model.upper.precompute) self.assertTrue(Model.lowup.precompute) # warmup model.create({'name': 'Foo', 'line_ids': [Command.create({'name': 'bar'})]}) # the creation makes one insert query for the main record, and one for the line with self.assertQueries([ insert(model, 'name', 'lower', 'upper', 'lowup', 'commercial_id', 'size'), insert(model.line_ids, 'parent_id', 'name', 'size'), ]): record = model.create({'name': 'Foo', 'line_ids': [Command.create({'name': 'bar'})]}) # check the values in the database self.cr.execute(f'SELECT * FROM "{model._table}" WHERE id=%s', [record.id]) [row] = self.cr.dictfetchall() self.assertEqual(row['name'], 'Foo') self.assertEqual(row['lower'], 'foo') self.assertEqual(row['upper'], 'FOO') self.assertEqual(row['lowup'], 'fooFOO') self.assertEqual(row['size'], 3) def test_precompute_combo(self): model = self.env['test_new_api.precompute.combo'] # warmup model.create({}) QUERIES = [insert(model, 'name', 'reader', 'editer', 'setter')] # no value at all with self.assertQueries(QUERIES): record = model.create({'name': 'A'}) self.assertEqual(record.reader, 'A') self.assertEqual(record.editer, 'A') self.assertEqual(record.setter, 'A') # default value with self.assertQueries(QUERIES), self.assertLogs('precompute_setter', level='WARNING'): defaults = dict(default_reader='X', default_editer='Y', default_setter='Z') record = model.with_context(**defaults).create({'name': 'A'}) self.assertEqual(record.reader, 'A') self.assertEqual(record.editer, 'Y') self.assertEqual(record.setter, 'Z') # explicit value with self.assertQueries(QUERIES), self.assertLogs('precompute_setter', level='WARNING'): record = model.create({'name': 'A', 'reader': 'X', 'editer': 'Y', 'setter': 'Z'}) self.assertEqual(record.reader, 'A') self.assertEqual(record.editer, 'Y') self.assertEqual(record.setter, 'Z') def test_precompute_editable(self): model = self.env['test_new_api.precompute.editable'] # no value for bar, no value for baz record = model.create({'foo': 'foo'}) self.assertEqual(record.bar, 'COMPUTED') self.assertEqual(record.baz, 'COMPUTED') self.assertEqual(record.baz2, 'COMPUTED') # value for bar, no value for baz record = model.create({'foo': 'foo', 'bar': 'bar'}) self.assertEqual(record.bar, 'bar') self.assertEqual(record.baz, 'COMPUTED') self.assertEqual(record.baz2, 'COMPUTED') # no value for bar, value for baz: the computation of bar should not # recompute baz in memory, in case a third field depends on it record = model.create({'foo': 'foo', 'baz': 'baz'}) self.assertEqual(record.bar, 'COMPUTED') self.assertEqual(record.baz, 'baz') self.assertEqual(record.baz2, 'baz') # value for bar, value for baz record = model.create({'foo': 'foo', 'bar': 'bar', 'baz': 'baz'}) self.assertEqual(record.bar, 'bar') self.assertEqual(record.baz, 'baz') self.assertEqual(record.baz2, 'baz') def test_precompute_readonly(self): """ Ensures - a stored, precomputed, readonly field cannot be altered by the user, - a stored, precomputed, readonly field, but with a states attributes changing the readonly of the field according to the state of the record, can be altered by the user. The `bar` field is store=True, precompute=True, readonly=True The `baz` field is store=True, precompute=True, readonly=False, """ model = self.env['test_new_api.precompute.readonly'] # no value for bar, no value for baz record = model.create({'foo': 'foo'}) self.assertEqual(record.bar, 'COMPUTED') self.assertEqual(record.baz, 'COMPUTED') # value for bar, no value for baz # bar is readonly, it must ignore the value for bar in the create values record = model.create({'foo': 'foo', 'bar': 'bar'}) self.assertEqual(record.bar, 'COMPUTED') self.assertEqual(record.baz, 'COMPUTED') # no value for bar, value for baz # baz is readonly=False # the value for baz must be taken into account record = model.create({'foo': 'foo', 'baz': 'baz'}) self.assertEqual(record.bar, 'COMPUTED') self.assertEqual(record.baz, 'baz') # value for bar, value for baz # bar must be ignored # baz must be taken into account record = model.create({'foo': 'foo', 'bar': 'bar', 'baz': 'baz'}) self.assertEqual(record.bar, 'COMPUTED') self.assertEqual(record.baz, 'baz') def test_precompute_required(self): model = self.env['test_new_api.precompute.required'] field = type(model).name self.assertTrue(field.related) self.assertTrue(field.store) self.assertTrue(field.required) partner = self.env['res.partner'].create({'name': 'Foo'}) # this will crash if field is not precomputed record = model.create({'partner_id': partner.id}) self.assertEqual(record.name, 'Foo') # check the queries being made QUERIES = [insert(model, 'partner_id', 'name')] with self.assertQueries(QUERIES): record = model.create({'partner_id': partner.id}) def test_precompute_batch(self): model = self.env['test_new_api.precompute.required'] partners = self.env['res.partner'].create([ {'name': name} for name in "Foo Bar Baz".split() ]) # warmup model.create({'partner_id': partners[0].id}) self.env.flush_all() self.env.invalidate_all() # check the number of queries: 1 SELECT + 1 INSERT with self.assertQueryCount(2): model.create([{'partner_id': pid} for pid in partners.ids]) def test_precompute_monetary(self): """Make sure the rounding of monetaries correctly prefetches currency fields""" model = self.env['test_new_api.precompute.monetary'] currency = self.env['res.currency'] # warmup model.create({}) self.env.flush_all() self.env.invalidate_all() fnames = [fname for fname, field in currency._fields.items() if field.prefetch] QUERIES = [ select(currency, *fnames), insert(model, 'amount', 'currency_id'), select(model, 'currency_id'), ] with self.assertQueries(QUERIES): model.create({}) class TestModifiedPerformance(TransactionCase): @classmethod def setUpClass(cls): super().setUpClass() cls.Modified = cls.env['test_new_api.modified'] cls.ModifiedLine = cls.env['test_new_api.modified.line'] cls.modified_a = cls.Modified.create({ 'name': 'Test', }) cls.modified_line_a = cls.ModifiedLine.create({ 'modified_id': cls.modified_a.id, 'quantity': 5, 'price': 1, }) cls.modified_line_a_child = cls.ModifiedLine.create({ 'modified_id': cls.modified_a.id, 'quantity': 5, 'price': 2, 'parent_id': cls.modified_line_a.id, }) cls.modified_line_a_child_child = cls.ModifiedLine.create({ 'modified_id': cls.modified_a.id, 'quantity': 5, 'price': 3, 'parent_id': cls.modified_line_a_child.id, }) cls.env.invalidate_all() # Clean the cache def test_modified_trigger_related(self): with self.assertQueryCount(0, flush=False): # No queries because `modified_name` has a empty cache self.modified_a.name = "Other" self.assertEqual(self.modified_line_a.modified_name, 'Other') # check def test_modified_trigger_no_store_compute(self): with self.assertQueryCount(0, flush=False): # No queries because `total_quantity` has a empty cache self.modified_line_a.quantity = 8 self.assertEqual(self.modified_a.total_quantity, 18) def test_modified_trigger_recursive_empty_cache(self): with self.assertQueryCount(0, flush=False): # No queries because `total_price` has a empty cache self.modified_line_a_child_child.price = 4 self.assertEqual(self.modified_line_a.total_price, 7) self.assertEqual(self.modified_line_a.total_price_quantity, 35) self.assertEqual(self.modified_line_a_child.total_price, 6) self.assertEqual(self.modified_line_a_child.total_price_quantity, 30) def test_modified_trigger_recursive_fill_cache(self): self.assertEqual(self.modified_line_a.total_price, 6) self.assertEqual(self.modified_line_a.total_price_quantity, 30) with self.assertQueryCount(0, flush=False): # No query because the `modified_line_a.total_price` has fetch every data needed self.modified_line_a_child_child.price = 4 self.assertEqual(self.modified_line_a.total_price_quantity, 35) self.assertEqual(self.modified_line_a.total_price, 7) def test_modified_trigger_recursive_partial_invalidate(self): self.assertEqual(self.modified_line_a_child.total_price_quantity, 25) self.modified_line_a_child_child.invalidate_recordset() self.modified_line_a_child.price with self.assertQueries([""" SELECT "test_new_api_modified_line"."id", "test_new_api_modified_line"."modified_id", "test_new_api_modified_line"."quantity", "test_new_api_modified_line"."parent_id", "test_new_api_modified_line"."create_uid", "test_new_api_modified_line"."create_date" FROM "test_new_api_modified_line" WHERE ("test_new_api_modified_line"."id" IN %s) """, """ SELECT "test_new_api_modified_line"."id", "test_new_api_modified_line"."parent_id" FROM "test_new_api_modified_line" WHERE ("test_new_api_modified_line"."id" IN %s) """], flush=False): # Two requests: # - one for fetch modified_line_a_child_child data (invalidate just before) # - one because modified_line_a_child.parent_id (invalidate just before because we invalidate inverse in `_invalidate_cache`, # see TODO) -> We should change that self.modified_line_a_child_child.price = 4 self.assertEqual(self.modified_line_a_child_child.total_price_quantity, 20) self.assertEqual(self.modified_line_a_child.total_price_quantity, 30) self.assertEqual(self.modified_line_a.total_price_quantity, 35) self.assertEqual(self.modified_line_a.total_price, 7)