# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
#
# test cases for new-style fields
#
import base64
import json
from collections import OrderedDict
from datetime import date, datetime, time
import io
from PIL import Image
from unittest.mock import patch
import psycopg2
import threading
from odoo import models, fields, Command
from odoo.addons.base.tests.common import TransactionCaseWithUserDemo
from odoo.addons.base.tests.test_expression import TransactionExpressionCase
from odoo.exceptions import AccessError, MissingError, UserError, ValidationError
from odoo.tests import TransactionCase, tagged, Form, users
from odoo.tools import mute_logger, float_repr
from odoo.tools.date_utils import add, subtract, start_of, end_of
from odoo.tools.image import image_data_uri
class TestFields(TransactionCaseWithUserDemo, TransactionExpressionCase):
def setUp(self):
# for tests methods that create custom models/fields
self.addCleanup(self.registry.reset_changes)
self.addCleanup(self.registry.clear_all_caches)
super(TestFields, self).setUp()
self.env.ref('test_new_api.discussion_0').write({'participants': [Command.link(self.user_demo.id)]})
# YTI FIX ME: The cache shouldn't be inconsistent (rco is gonna fix it)
# self.env.ref('test_new_api.discussion_0').participants -> 1 user
# self.env.ref('test_new_api.discussion_0').invalidate()
# self.env.ref('test_new_api.discussion_0').with_context(active_test=False).participants -> 2 users
self.env.ref('test_new_api.message_0_1').write({'author': self.user_demo.id})
def test_00_basics(self):
""" test accessing new fields """
# find a discussion
discussion = self.env.ref('test_new_api.discussion_0')
# read field as a record attribute or as a record item
self.assertIsInstance(discussion.name, str)
self.assertIsInstance(discussion['name'], str)
self.assertEqual(discussion['name'], discussion.name)
# read it with method read()
values = discussion.read(['name'])[0]
self.assertEqual(values['name'], discussion.name)
def test_01_basic_get_assertion(self):
""" test item getter """
# field access works on single record
record = self.env.ref('test_new_api.message_0_0')
self.assertEqual(len(record), 1)
ok = record.body
# field access fails on multiple records
records = self.env['test_new_api.message'].search([])
assert len(records) > 1
with self.assertRaises(ValueError):
faulty = records.body
def test_01_basic_set_assertion(self):
""" test item setter """
# field assignment works on single record
record = self.env.ref('test_new_api.message_0_0')
self.assertEqual(len(record), 1)
record.body = 'OK'
# field assignment on multiple records should assign value to all records
records = self.env['test_new_api.message'].search([])
records.body = 'Updated'
self.assertTrue(all(map(lambda record:record.body=='Updated', records)))
# field assigmenent does not cache the wrong value when write overridden
record.priority = 4
self.assertEqual(record.priority, 5)
def test_05_unknown_fields(self):
""" test ORM operations with unknown fields """
cat = self.env['test_new_api.category'].create({'name': 'Foo'})
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.search([('zzz', '=', 42)])
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.search([], order='zzz')
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.read(['zzz'])
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.read_group([('zzz', '=', 42)], fields=['color'], groupby=['parent'])
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.read_group([], fields=['zzz'], groupby=['parent'])
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.read_group([], fields=['zzz:sum'], groupby=['parent'])
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.read_group([], fields=['color'], groupby=['zzz'])
with self.assertRaisesRegex(ValueError, 'is not a valid aggregate'):
cat.read_group([], fields=['color'], groupby=['parent'], orderby='zzz')
# exception: accept '__count' as field to aggregate
cat.read_group([], fields=['__count'], groupby=['parent'])
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.create({'name': 'Foo', 'zzz': 42})
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.write({'zzz': 42})
with self.assertRaisesRegex(ValueError, 'Invalid field'):
cat.new({'name': 'Foo', 'zzz': 42})
def test_10_computed(self):
""" check definition of computed fields """
# by default function fields are not stored, readonly, not copied
field = self.env['test_new_api.message']._fields['size']
self.assertFalse(field.store)
self.assertFalse(field.compute_sudo)
self.assertTrue(field.readonly)
self.assertFalse(field.copy)
field = self.env['test_new_api.message']._fields['name']
self.assertTrue(field.store)
self.assertTrue(field.compute_sudo)
self.assertTrue(field.readonly)
self.assertFalse(field.copy)
# stored editable computed fields are copied according to their type
field = self.env['test_new_api.compute.onchange']._fields['baz']
self.assertTrue(field.store)
self.assertTrue(field.compute_sudo)
self.assertFalse(field.readonly)
self.assertTrue(field.copy)
field = self.env['test_new_api.compute.onchange']._fields['line_ids']
self.assertTrue(field.store)
self.assertTrue(field.compute_sudo)
self.assertFalse(field.readonly)
self.assertFalse(field.copy) # like a regular one2many field
field = self.env['test_new_api.compute.onchange']._fields['tag_ids']
self.assertTrue(field.store)
self.assertTrue(field.compute_sudo)
self.assertFalse(field.readonly)
self.assertTrue(field.copy) # like a regular many2many field
def test_10_computed_custom(self):
""" check definition of custom computed fields """
# Flush demo user before creating a new ir.model.fields to avoid
# a deadlock
self.env.flush_all()
self.env['ir.model.fields'].create({
'name': 'x_bool_false_computed',
'model_id': self.env.ref('test_new_api.model_test_new_api_message').id,
'field_description': 'A boolean computed to false',
'compute': "for r in self: r['x_bool_false_computed'] = False",
'store': False,
'ttype': 'boolean'
})
field = self.env['test_new_api.message']._fields['x_bool_false_computed']
self.assertFalse(self.registry.field_depends[field])
def test_10_computed_custom_invalid_transitive_depends(self):
self.patch(type(self.env["ir.model.fields"]), "_check_depends", lambda self: True)
self.env["ir.model.fields"].create(
{
"name": "x_computed_custom_valid_depends",
"model_id": self.env.ref("test_new_api.model_test_new_api_foo").id,
"state": "manual",
"field_description": "A compute with a valid depends",
"compute": "for r in self: r['x_computed_custom_valid_depends'] = False",
"depends": "value1",
"store": False,
"ttype": "boolean",
}
)
self.env["ir.model.fields"].create(
{
"name": "x_computed_custom_valid_transitive_depends",
"model_id": self.env.ref("test_new_api.model_test_new_api_foo").id,
"state": "manual",
"field_description": "A compute with a valid transitive depends",
"compute": "for r in self: r['x_computed_custom_valid_transitive_depends'] = False",
"depends": "x_computed_custom_valid_depends",
"store": False,
"ttype": "boolean",
}
)
self.env["ir.model.fields"].create(
{
"name": "x_computed_custom_invalid_depends",
"model_id": self.env.ref("test_new_api.model_test_new_api_foo").id,
"state": "manual",
"field_description": "A compute with an invalid depends",
"compute": "for r in self: r['x_computed_custom_invalid_depends'] = False",
"depends": "bar",
"store": False,
"ttype": "boolean",
}
)
self.env["ir.model.fields"].create(
{
"name": "x_computed_custom_invalid_transitive_depends",
"model_id": self.env.ref("test_new_api.model_test_new_api_foo").id,
"state": "manual",
"field_description": "A compute with an invalid transitive depends",
"compute": "for r in self: r['x_computed_custom_invalid_transitive_depends'] = False",
"depends": "x_computed_custom_invalid_depends",
"store": False,
"ttype": "boolean",
}
)
fields = self.env["test_new_api.foo"]._fields
get_trigger_tree = self.registry.get_trigger_tree
value1 = fields["value1"]
valid_depends = fields["x_computed_custom_valid_depends"]
valid_transitive_depends = fields["x_computed_custom_valid_transitive_depends"]
invalid_depends = fields["x_computed_custom_invalid_depends"]
invalid_transitive_depends = fields["x_computed_custom_invalid_transitive_depends"]
# `x_computed_custom_valid_depends` in the triggers of the field `value1`
self.assertTrue(valid_depends in get_trigger_tree([value1]).root)
# `x_computed_custom_valid_transitive_depends` in the triggers `x_computed_custom_valid_depends` and `value1`
self.assertTrue(valid_transitive_depends in get_trigger_tree([valid_depends]).root)
self.assertTrue(valid_transitive_depends in get_trigger_tree([value1]).root)
# `x_computed_custom_invalid_depends` not in any triggers, as it was invalid and was skipped
self.assertEqual(
sum(invalid_depends in get_trigger_tree([field]).root for field in fields.values()), 0
)
# `x_computed_custom_invalid_transitive_depends` in the triggers of `x_computed_custom_invalid_depends` only
self.assertTrue(invalid_transitive_depends in get_trigger_tree([invalid_depends]).root)
self.assertEqual(
sum(invalid_transitive_depends in get_trigger_tree([field]).root for field in fields.values()), 1
)
@mute_logger('odoo.fields')
def test_10_computed_stored_x_name(self):
# create a custom model with two fields
self.env["ir.model"].create({
"name": "x_test_10_compute_store_x_name",
"model": "x_test_10_compute_store_x_name",
"field_id": [
(0, 0, {'name': 'x_name', 'ttype': 'char'}),
(0, 0, {'name': 'x_stuff_id', 'ttype': 'many2one', 'relation': 'ir.model'}),
],
})
self.env.invalidate_all()
# set 'x_stuff_id' refer to a model not loaded yet
self.cr.execute("""
UPDATE ir_model_fields
SET relation = 'not.loaded'
WHERE model = 'x_test_10_compute_store_x_name' AND name = 'x_stuff_id'
""")
# set 'x_name' be computed and depend on 'x_stuff_id'
self.cr.execute("""
UPDATE ir_model_fields
SET compute = 'pass', depends = 'x_stuff_id.x_custom_1'
WHERE model = 'x_test_10_compute_store_x_name' AND name = 'x_name'
""")
# setting up models should not crash
self.registry.setup_models(self.cr)
def test_10_display_name(self):
""" test definition of automatic field 'display_name' """
field = type(self.env['test_new_api.discussion']).display_name
self.assertTrue(field.automatic)
self.assertTrue(field.compute)
self.assertEqual(self.registry.field_depends[field], ('name',))
def test_10_non_stored(self):
""" test non-stored fields """
# a field declared with store=False should not have a column
field = self.env['test_new_api.category']._fields['dummy']
self.assertFalse(field.store)
self.assertFalse(field.compute)
self.assertFalse(field.inverse)
# find messages
for message in self.env['test_new_api.message'].search([]):
# check definition of field
self.assertEqual(message.size, len(message.body or ''))
# check recomputation after record is modified
size = message.size
message.write({'body': (message.body or '') + "!!!"})
self.assertEqual(message.size, size + 3)
# create a message, assign body, and check size in several environments
message1 = self.env['test_new_api.message'].create({})
message2 = message1.with_user(self.user_demo)
self.assertEqual(message1.size, 0)
self.assertEqual(message2.size, 0)
message1.write({'body': "XXX"})
self.assertEqual(message1.size, 3)
self.assertEqual(message2.size, 3)
# special case: computed field without dependency must be computed
record = self.env['test_new_api.mixed'].create({})
self.assertTrue(record.now)
def test_11_stored(self):
""" test stored fields """
def check_stored(disc):
""" Check the stored computed field on disc.messages """
for msg in disc.messages:
self.assertEqual(msg.name, "[%s] %s" % (disc.name, msg.author.name))
# find the demo discussion, and check messages
discussion1 = self.env.ref('test_new_api.discussion_0')
self.assertTrue(discussion1.messages)
check_stored(discussion1)
# modify discussion name, and check again messages
discussion1.name = 'Talking about stuff...'
check_stored(discussion1)
# switch message from discussion, and check again
# See YTI FIXME
self.env.invalidate_all()
discussion2 = discussion1.copy({'name': 'Another discussion'})
message2 = discussion1.messages[0]
message2.discussion = discussion2
check_stored(discussion2)
# create a new discussion with messages, and check their name
user_root = self.env.ref('base.user_root')
user_demo = self.user_demo
discussion3 = self.env['test_new_api.discussion'].create({
'name': 'Stuff',
'participants': [Command.link(user_root.id), Command.link(user_demo.id)],
'messages': [
Command.create({'author': user_root.id, 'body': 'one'}),
Command.create({'author': user_demo.id, 'body': 'two'}),
Command.create({'author': user_root.id, 'body': 'three'}),
],
})
check_stored(discussion3)
# modify the discussion messages: edit the 2nd one, remove the last one
# (keep modifications in that order, as they reproduce a former bug!)
discussion3.write({
'messages': [
Command.link(discussion3.messages[0].id),
Command.update(discussion3.messages[1].id, {'author': user_root.id}),
Command.delete(discussion3.messages[2].id),
],
})
check_stored(discussion3)
def test_11_stored_protected(self):
""" test protection against recomputation """
model = self.env['test_new_api.compute.readonly']
field = model._fields['bar']
record = model.create({'foo': 'unprotected #1'})
self.assertEqual(record.bar, 'unprotected #1')
record.write({'foo': 'unprotected #2'})
self.assertEqual(record.bar, 'unprotected #2')
# by protecting 'bar', we prevent it from being recomputed
with self.env.protecting([field], record):
record.write({'foo': 'protected'})
self.assertEqual(record.bar, 'unprotected #2')
# also works when nested
with self.env.protecting([field], record):
record.write({'foo': 'protected'})
self.assertEqual(record.bar, 'unprotected #2')
record.write({'foo': 'protected'})
self.assertEqual(record.bar, 'unprotected #2')
record.write({'foo': 'unprotected #3'})
self.assertEqual(record.bar, 'unprotected #3')
# also works with duplicated fields
with self.env.protecting([field, field], record):
record.write({'foo': 'protected'})
self.assertEqual(record.bar, 'unprotected #3')
record.write({'foo': 'unprotected #4'})
self.assertEqual(record.bar, 'unprotected #4')
# we protect 'bar' on a different record
with self.env.protecting([field], record):
record2 = model.create({'foo': 'unprotected'})
self.assertEqual(record2.bar, 'unprotected')
def test_11_computed_access(self):
""" test computed fields with access right errors """
User = self.env['res.users']
user1 = User.create({'name': 'Aaaah', 'login': 'a'})
user2 = User.create({'name': 'Boooh', 'login': 'b'})
user3 = User.create({'name': 'Crrrr', 'login': 'c'})
# add a rule to not give access to user2
self.env['ir.rule'].create({
'model_id': self.env['ir.model'].search([('model', '=', 'res.users')]).id,
'domain_force': "[('id', '!=', %d)]" % user2.id,
})
# DLE P72: Since we decided that we do not raise security access errors for data to which we had the occassion
# to put the value in the cache, we need to invalidate the cache for user1, user2 and user3 in order
# to test the below access error. Otherwise the above create calls set in the cache the information needed
# to compute `company_type` ('is_company'), and doesn't need to trigger a read.
# We need to force the read in order to test the security access
self.env.invalidate_all()
# group users as a recordset, and read them as user demo
users = (user1 + user2 + user3).with_user(self.user_demo)
user1, user2, user3 = users
# regression test: a bug invalidated the field's value from cache
user1.company_type
with self.assertRaises(AccessError):
user2.company_type
user3.company_type
def test_12_recursive(self):
""" test recursively dependent fields """
Category = self.env['test_new_api.category']
abel = Category.create({'name': 'Abel'})
beth = Category.create({'name': 'Bethany'})
cath = Category.create({'name': 'Catherine'})
dean = Category.create({'name': 'Dean'})
ewan = Category.create({'name': 'Ewan'})
finn = Category.create({'name': 'Finnley'})
gabe = Category.create({'name': 'Gabriel'})
cath.parent = finn.parent = gabe
abel.parent = beth.parent = cath
dean.parent = ewan.parent = finn
self.assertEqual(abel.display_name, "Gabriel / Catherine / Abel")
self.assertEqual(beth.display_name, "Gabriel / Catherine / Bethany")
self.assertEqual(cath.display_name, "Gabriel / Catherine")
self.assertEqual(dean.display_name, "Gabriel / Finnley / Dean")
self.assertEqual(ewan.display_name, "Gabriel / Finnley / Ewan")
self.assertEqual(finn.display_name, "Gabriel / Finnley")
self.assertEqual(gabe.display_name, "Gabriel")
ewan.parent = cath
self.assertEqual(ewan.display_name, "Gabriel / Catherine / Ewan")
cath.parent = finn
self.assertEqual(ewan.display_name, "Gabriel / Finnley / Catherine / Ewan")
def test_12_recursive_recompute(self):
""" test recomputation on recursively dependent field """
a = self.env['test_new_api.recursive'].create({'name': 'A'})
b = self.env['test_new_api.recursive'].create({'name': 'B', 'parent': a.id})
c = self.env['test_new_api.recursive'].create({'name': 'C', 'parent': b.id})
d = self.env['test_new_api.recursive'].create({'name': 'D', 'parent': c.id})
self.assertEqual(a.full_name, 'A')
self.assertEqual(b.full_name, 'A / B')
self.assertEqual(c.full_name, 'A / B / C')
self.assertEqual(d.full_name, 'A / B / C / D')
self.assertEqual(a.display_name, 'A')
self.assertEqual(b.display_name, 'A / B')
self.assertEqual(c.display_name, 'A / B / C')
self.assertEqual(d.display_name, 'A / B / C / D')
a.name = 'A1'
self.assertEqual(a.full_name, 'A1')
self.assertEqual(b.full_name, 'A1 / B')
self.assertEqual(c.full_name, 'A1 / B / C')
self.assertEqual(d.full_name, 'A1 / B / C / D')
self.assertEqual(a.display_name, 'A1')
self.assertEqual(b.display_name, 'A1 / B')
self.assertEqual(c.display_name, 'A1 / B / C')
self.assertEqual(d.display_name, 'A1 / B / C / D')
b.parent = False
self.assertEqual(a.full_name, 'A1')
self.assertEqual(b.full_name, 'B')
self.assertEqual(c.full_name, 'B / C')
self.assertEqual(d.full_name, 'B / C / D')
self.assertEqual(a.display_name, 'A1')
self.assertEqual(b.display_name, 'B')
self.assertEqual(c.display_name, 'B / C')
self.assertEqual(d.display_name, 'B / C / D')
# rename several records to trigger several recomputations at once
(d + c + b).write({'name': 'X'})
self.assertEqual(a.full_name, 'A1')
self.assertEqual(b.full_name, 'X')
self.assertEqual(c.full_name, 'X / X')
self.assertEqual(d.full_name, 'X / X / X')
self.assertEqual(a.display_name, 'A1')
self.assertEqual(b.display_name, 'X')
self.assertEqual(c.display_name, 'X / X')
self.assertEqual(d.display_name, 'X / X / X')
# delete b; both c and d are deleted in cascade; c should also be marked
# to recompute, but recomputation should not fail...
b.unlink()
self.assertEqual((a + b + c + d).exists(), a)
def test_12_recursive_tree(self):
foo = self.env['test_new_api.recursive.tree'].create({'name': 'foo'})
self.assertEqual(foo.display_name, 'foo()')
bar = foo.create({'name': 'bar', 'parent_id': foo.id})
self.assertEqual(foo.display_name, 'foo(bar())')
baz = foo.create({'name': 'baz', 'parent_id': bar.id})
self.assertEqual(foo.display_name, 'foo(bar(baz()))')
def test_12_recursive_unlink(self):
order = self.env['test_new_api.recursive.order'].create({'value': 42})
line = self.env['test_new_api.recursive.line'].create({'order_id': order.id})
task = self.env['test_new_api.recursive.task'].create({'value': 42})
self.assertEqual(task.line_id, line)
self.assertEqual(line.task_ids, task)
self.assertTrue(line.task_number)
# Before deleting order, the following are marked to recompute:
# - task.line_id (recursive, depends on task.line_id.order_id.value)
# - line.task_number (implicitely depends on line.task_ids.line_id)
#
# If task.line_id is ever recomputed in order to mark line.task_number,
# its recomputed value will be lost in the cache invalidation, and
# there will be nothing left to write in the database afterwards! This
# makes the call to unlink() crash in that case.
#
order.unlink()
def test_12_recursive_context_dependent(self):
a = self.env['test_new_api.recursive'].create({'name': 'A'})
b = self.env['test_new_api.recursive'].create({'name': 'B', 'parent': a.id})
c = self.env['test_new_api.recursive'].create({'name': 'C', 'parent': b.id})
d = self.env['test_new_api.recursive'].create({'name': 'D', 'parent': c.id})
self.assertEqual(a.context_dependent_name, 'A')
self.assertEqual(b.context_dependent_name, 'A / B')
self.assertEqual(c.context_dependent_name, 'A / B / C')
self.assertEqual(d.context_dependent_name, 'A / B / C / D')
# now let's swith to another context to update the dependency
a.with_context(bozo=42).name = 'A1'
self.assertEqual(a.context_dependent_name, 'A1')
self.assertEqual(b.context_dependent_name, 'A1 / B')
self.assertEqual(c.context_dependent_name, 'A1 / B / C')
self.assertEqual(d.context_dependent_name, 'A1 / B / C / D')
def test_12_cascade(self):
""" test computed field depending on computed field """
message = self.env.ref('test_new_api.message_0_0')
self.env.invalidate_all()
double_size = message.double_size
self.assertEqual(double_size, message.size)
record = self.env['test_new_api.cascade'].create({'foo': "Hi"})
self.assertEqual(record.baz, "<[Hi]>")
record.foo = "Ho"
self.assertEqual(record.baz, "<[Ho]>")
def test_12_unlink_cascade_active_store(self):
""" Test that `unlink` on many records doesn't raise a RecursionError
with a stored related `active` field.
"""
message = self.env['test_new_api.message'].create({
'active': False,
})
self.env['test_new_api.emailmessage'].create(
[{'message': message.id}] * 101,
)
message.unlink()
def test_12_unlink_cascade_ir_rule_using_related(self):
""" Test that `unlink` on many records doesn't raise a RecursionError
when there is an ir.rule with a stored related field to compute.
"""
message = self.env['test_new_api.message'].create({
'active': False,
})
self.env['test_new_api.emailmessage'].create(
[{'message': message.id}] * 101,
)
# Create an ir.rule, which forces to flush field 'active'
self.env['ir.rule'].create({
'model_id': self.env['ir.model']._get_id('test_new_api.emailmessage'),
'groups': [self.env.ref('base.group_user').id],
'domain_force': str([('active', '=', False)]),
})
message.with_user(self.user_demo).unlink()
def test_12_dynamic_depends(self):
Model = self.registry['test_new_api.compute.dynamic.depends']
self.assertEqual(self.registry.field_depends[Model.full_name], ())
# the dependencies of full_name are stored in a config parameter
self.env['ir.config_parameter'].set_param('test_new_api.full_name', 'name1,name2')
# this must re-evaluate the field's dependencies
self.env.flush_all()
self.registry.setup_models(self.cr)
self.assertEqual(self.registry.field_depends[Model.full_name], ('name1', 'name2'))
def test_12_one2many_reference_domain(self):
model = self.env['test_new_api.inverse_m2o_ref']
o2m_field = model._fields['model_ids']
self.assertEqual(o2m_field.get_domain_list(model), [('res_model', '=', model._name)])
o2m_field = model._fields['model_computed_ids']
self.assertEqual(o2m_field.get_domain_list(model), [])
def test_13_inverse(self):
""" test inverse computation of fields """
Category = self.env['test_new_api.category']
abel = Category.create({'name': 'Abel'})
beth = Category.create({'name': 'Bethany'})
cath = Category.create({'name': 'Catherine'})
dean = Category.create({'name': 'Dean'})
ewan = Category.create({'name': 'Ewan'})
finn = Category.create({'name': 'Finnley'})
gabe = Category.create({'name': 'Gabriel'})
self.assertEqual(ewan.display_name, "Ewan")
ewan.display_name = "Abel / Bethany / Catherine / Erwan"
self.assertEqual(beth.parent, abel)
self.assertEqual(cath.parent, beth)
self.assertEqual(ewan.parent, cath)
self.assertEqual(ewan.name, "Erwan")
# check create/write with several records
vals = {'name': 'None', 'display_name': 'Foo'}
foo1, foo2 = Category.create([vals, vals])
self.assertEqual(foo1.name, 'Foo')
self.assertEqual(foo2.name, 'Foo')
(foo1 + foo2).write({'display_name': 'Bar'})
self.assertEqual(foo1.name, 'Bar')
self.assertEqual(foo2.name, 'Bar')
# create/write on 'foo' should only invoke the compute method
log = []
model = self.env['test_new_api.compute.inverse'].with_context(log=log)
record = model.create({'foo': 'Hi'})
self.assertEqual(record.foo, 'Hi')
self.assertEqual(record.bar, 'Hi')
self.assertCountEqual(log, ['compute'])
log.clear()
record.write({'foo': 'Ho'})
self.assertEqual(record.foo, 'Ho')
self.assertEqual(record.bar, 'Ho')
self.assertCountEqual(log, ['compute'])
# create/write on 'bar' should only invoke the inverse method
log.clear()
record = model.create({'bar': 'Hi'})
self.assertEqual(record.foo, 'Hi')
self.assertEqual(record.bar, 'Hi')
self.assertCountEqual(log, ['inverse'])
log.clear()
record.write({'bar': 'Ho'})
self.assertEqual(record.foo, 'Ho')
self.assertEqual(record.bar, 'Ho')
self.assertCountEqual(log, ['inverse'])
# Test compatibility multiple compute/inverse fields
log = []
model = self.env['test_new_api.multi_compute_inverse'].with_context(log=log)
record = model.create({
'bar1': '1',
'bar2': '2',
'bar3': '3',
})
self.assertEqual(record.foo, '1/2/3')
self.assertEqual(record.bar1, '1')
self.assertEqual(record.bar2, '2')
self.assertEqual(record.bar3, '3')
self.assertCountEqual(log, ['inverse1', 'inverse23'])
log.clear()
record.write({'bar2': '4', 'bar3': '5'})
self.assertEqual(record.foo, '1/4/5')
self.assertEqual(record.bar1, '1')
self.assertEqual(record.bar2, '4')
self.assertEqual(record.bar3, '5')
self.assertCountEqual(log, ['inverse23'])
log.clear()
record.write({'bar1': '6', 'bar2': '7'})
self.assertEqual(record.foo, '6/7/5')
self.assertEqual(record.bar1, '6')
self.assertEqual(record.bar2, '7')
self.assertEqual(record.bar3, '5')
self.assertCountEqual(log, ['inverse1', 'inverse23'])
log.clear()
record.write({'foo': 'A/B/C'})
self.assertEqual(record.foo, 'A/B/C')
self.assertEqual(record.bar1, 'A')
self.assertEqual(record.bar2, 'B')
self.assertEqual(record.bar3, 'C')
self.assertCountEqual(log, ['compute'])
# corner case: write on a field that is marked to compute
log.clear()
# writing on 'foo' marks 'bar1', 'bar2', 'bar3' to compute
record.write({'foo': '1/2/3'})
self.assertCountEqual(log, [])
# writing on 'bar3' must force the computation before updating
record.write({'bar3': 'X'})
self.assertCountEqual(log, ['compute', 'inverse23'])
self.assertEqual(record.foo, '1/2/X')
self.assertEqual(record.bar1, '1')
self.assertEqual(record.bar2, '2')
self.assertEqual(record.bar3, 'X')
self.assertCountEqual(log, ['compute', 'inverse23'])
log.clear()
# writing on 'foo' marks 'bar1', 'bar2', 'bar3' to compute
record.write({'foo': 'A/B/C'})
self.assertCountEqual(log, [])
# writing on 'bar1', 'bar2', 'bar3' discards the computation
record.write({'bar1': 'X', 'bar2': 'Y', 'bar3': 'Z'})
self.assertCountEqual(log, ['inverse1', 'inverse23'])
self.assertEqual(record.foo, 'X/Y/Z')
self.assertEqual(record.bar1, 'X')
self.assertEqual(record.bar2, 'Y')
self.assertEqual(record.bar3, 'Z')
self.assertCountEqual(log, ['inverse1', 'inverse23'])
def test_13_inverse_access(self):
""" test access rights on inverse fields """
foo = self.env['test_new_api.category'].create({'name': 'Foo'})
user = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
self.assertFalse(user.has_group('base.group_system'))
# add group on non-stored inverse field
self.patch(type(foo).display_name, 'groups', 'base.group_system')
with self.assertRaises(AccessError):
foo.with_user(user).display_name = 'Forbidden'
def test_13_inverse_with_unlink(self):
""" test x2many delete command combined with an inverse field """
country1 = self.env['res.country'].create({'name': 'test country', 'code': 'ZV'})
country2 = self.env['res.country'].create({'name': 'other country', 'code': 'ZX'})
company = self.env['res.company'].create({
'name': 'test company',
'child_ids': [
(0, 0, {'name': 'Child Company 1'}),
(0, 0, {'name': 'Child Company 2'}),
]
})
child_company = company.child_ids[0]
# check first that the field has an inverse and is not stored
field = type(company).country_id
self.assertFalse(field.store)
self.assertTrue(field.inverse)
company.write({'country_id': country1.id})
self.assertEqual(company.country_id, country1)
company.write({
'country_id': country2.id,
'child_ids': [(2, child_company.id)],
})
self.assertEqual(company.country_id, country2)
def test_14_search(self):
""" test search on computed fields """
discussion = self.env.ref('test_new_api.discussion_0')
# determine message sizes
sizes = set(message.size for message in discussion.messages)
# search for messages based on their size
for size in sizes:
messages0 = self.env['test_new_api.message'].search(
[('discussion', '=', discussion.id), ('size', '<=', size)])
messages1 = self.env['test_new_api.message'].browse()
for message in discussion.messages:
if message.size <= size:
messages1 += message
self.assertEqual(messages0, messages1)
def test_15_constraint(self):
""" test new-style Python constraints """
discussion = self.env.ref('test_new_api.discussion_0')
self.env.flush_all()
# remove oneself from discussion participants: we can no longer create
# messages in discussion
discussion.participants -= self.env.user
with self.assertRaises(ValidationError):
self.env['test_new_api.message'].create({'discussion': discussion.id, 'body': 'Whatever'})
# make sure that assertRaises() does not leave fields to recompute
self.assertFalse(self.env.fields_to_compute())
# put back oneself into discussion participants: now we can create
# messages in discussion
discussion.participants += self.env.user
self.env['test_new_api.message'].create({'discussion': discussion.id, 'body': 'Whatever'})
# check constraint on recomputed field
self.assertTrue(discussion.messages)
with self.assertRaises(ValidationError):
discussion.name = "X"
def test_15_constraint_inverse(self):
""" test constraint method on normal field and field with inverse """
log = []
model = self.env['test_new_api.compute.inverse'].with_context(log=log, log_constraint=True)
# create/write with normal field only
log.clear()
record = model.create({'baz': 'Hi'})
self.assertCountEqual(log, ['constraint'])
log.clear()
record.write({'baz': 'Ho'})
self.assertCountEqual(log, ['constraint'])
# create/write with inverse field only
log.clear()
record = model.create({'bar': 'Hi'})
self.assertCountEqual(log, ['inverse', 'constraint'])
log.clear()
record.write({'bar': 'Ho'})
self.assertCountEqual(log, ['inverse', 'constraint'])
# create/write with both fields only
log.clear()
record = model.create({'bar': 'Hi', 'baz': 'Hi'})
self.assertCountEqual(log, ['inverse', 'constraint'])
log.clear()
record.write({'bar': 'Ho', 'baz': 'Ho'})
self.assertCountEqual(log, ['inverse', 'constraint'])
def test_16_compute_unassigned(self):
model = self.env['test_new_api.compute.unassigned']
# real record
record = model.create({})
with self.assertRaises(ValueError):
record.bar
self.assertEqual(record.bare, False)
self.assertEqual(record.bars, False)
self.assertEqual(record.bares, False)
# new record
record = model.new()
with self.assertRaises(ValueError):
record.bar
self.assertEqual(record.bare, False)
self.assertEqual(record.bars, False)
self.assertEqual(record.bares, False)
def test_16_compute_unassigned_access_error(self):
# create two records
records = self.env['test_new_api.compute.unassigned'].create([{}, {}])
self.env.flush_all()
# alter access rights: regular users cannot read 'records'
access = self.env.ref('test_new_api.access_test_new_api_compute_unassigned')
access.perm_read = False
self.env.flush_all()
# switch to environment with user demo
records = records.with_user(self.user_demo)
# check that records are not accessible
with self.assertRaises(AccessError):
records[0].bars
with self.assertRaises(AccessError):
records[1].bars
# Modify the records and flush() changes with the current environment:
# this should not trigger an access error, whatever the order in which
# records are considered. It may fail in the following scenario:
# - mark field 'bars' to compute on records
# - access records[0].bars
# - recompute bars on records (both) -> assign records[0] only
# - return records[0].bars from cache
# - access records[1].bars
# - recompute nothing (done already)
# - records[1].bars is not in cache
# - fetch records[1].bars -> access error
records[0].foo = "assign"
records[1].foo = "x"
self.env.flush_all()
# try the other way around, too
self.env.invalidate_all()
records[0].foo = "x"
records[1].foo = "assign"
self.env.flush_all()
def test_17_compute_depends_on_many2many(self):
user1, user2, user3 = self.env['test_new_api.user'].create([{}, {}, {}])
group = self.env['test_new_api.group'].create({'user_ids': [Command.link(user1.id)]})
self.env.flush_all()
field = type(user1).group_count
self.assertFalse(self.env.records_to_compute(field))
# should mark user2 and user3 to compute only
group.write({'user_ids': [Command.link(user1.id), Command.link(user2.id), Command.link(user3.id)]})
self.assertEqual(self.env.records_to_compute(field), user2 + user3)
# should mark user2 to compute only
self.env.flush_all()
group.write({'user_ids': [Command.unlink(user2.id)]})
self.assertEqual(self.env.records_to_compute(field), user2)
# should mark user2 and user3 to compute only
self.env.flush_all()
group.write({'user_ids': [Command.set([user1.id, user2.id])]})
self.assertEqual(self.env.records_to_compute(field), user2 + user3)
# should mark user3 to compute only
self.env.flush_all()
user3.write({'group_ids': [Command.link(group.id)]})
self.assertEqual(self.env.records_to_compute(field), user3)
# similar with new records, but only check recomputation
user1 = self.env['test_new_api.user'].new({})
user2 = self.env['test_new_api.user'].new({})
group = self.env['test_new_api.group'].new({'user_ids': [user1.id]})
self.assertEqual(user1.group_count, 1)
self.assertEqual(user2.group_count, 0)
group.user_ids += user2
self.assertEqual(user1.group_count, 1)
self.assertEqual(user2.group_count, 1)
def test_20_float(self):
""" test rounding of float fields """
record = self.env['test_new_api.mixed'].create({})
query = "SELECT 1 FROM test_new_api_mixed WHERE id=%s AND number=%s"
# 2.49609375 (exact float) must be rounded to 2.5
record.write({'number': 2.49609375})
self.env.flush_all()
self.cr.execute(query, [record.id, '2.5'])
self.assertTrue(self.cr.rowcount)
self.assertEqual(record.number, 2.5)
# 1.1 (1.1000000000000000888178420 in float) must be 1.1 in database
record.write({'number': 1.1})
self.env.flush_all()
self.cr.execute(query, [record.id, '1.1'])
self.assertTrue(self.cr.rowcount)
self.assertEqual(record.number, 1.1)
def test_21_float_digits(self):
""" test field description """
precision = self.env.ref('test_new_api.decimal_new_api_number')
description = self.env['test_new_api.mixed'].fields_get()['number2']
self.assertEqual(description['digits'], (16, precision.digits))
def check_monetary(self, record, amount, currency, msg=None):
# determine the possible roundings of amount
if currency:
ramount = currency.round(amount)
samount = float(float_repr(ramount, currency.decimal_places))
else:
ramount = samount = amount
# check the currency on record
self.assertEqual(record.currency_id, currency)
# check the value on the record
self.assertIn(record.amount, [ramount, samount], msg)
# check the value in the database
self.env.flush_all()
self.cr.execute('SELECT amount FROM test_new_api_mixed WHERE id=%s', [record.id])
value = self.cr.fetchone()[0]
self.assertEqual(value, samount, msg)
def test_20_monetary(self):
""" test monetary fields """
model = self.env['test_new_api.mixed']
currency = self.env['res.currency'].with_context(active_test=False)
amount = 14.70126
for rounding in [0.01, 0.0001, 1.0, 0]:
# first retrieve a currency corresponding to rounding
if rounding:
currency = currency.search([('rounding', '=', rounding)], limit=1)
self.assertTrue(currency, "No currency found for rounding %s" % rounding)
else:
# rounding=0 corresponds to currency=False
currency = currency.browse()
# case 1: create with amount and currency
record = model.create({'amount': amount, 'currency_id': currency.id})
self.check_monetary(record, amount, currency, 'create(amount, currency)')
# case 2: assign amount
record.amount = 0
record.amount = amount
self.check_monetary(record, amount, currency, 'assign(amount)')
# case 3: write with amount and currency
record.write({'amount': 0, 'currency_id': False})
record.write({'amount': amount, 'currency_id': currency.id})
self.check_monetary(record, amount, currency, 'write(amount, currency)')
# case 4: write with amount only
record.write({'amount': 0})
record.write({'amount': amount})
self.check_monetary(record, amount, currency, 'write(amount)')
# case 5: write with amount on several records
records = record + model.create({'currency_id': currency.id})
records.write({'amount': 0})
records.write({'amount': amount})
for record in records:
self.check_monetary(record, amount, currency, 'multi write(amount)')
def test_20_monetary_opw_2223134(self):
""" test monetary fields with cache override """
model = self.env['test_new_api.monetary_order']
currency = self.env.ref('base.USD')
def check(value):
self.assertEqual(record.total, value)
self.env.flush_all()
self.cr.execute('SELECT total FROM test_new_api_monetary_order WHERE id=%s', [record.id])
[total] = self.cr.fetchone()
self.assertEqual(total, value)
# create, and compute amount
record = model.create({
'currency_id': currency.id,
'line_ids': [Command.create({'subtotal': 1.0})],
})
check(1.0)
# delete and add a line: the deletion of the line clears the cache, then
# the recomputation of 'total' must prefetch record.currency_id without
# screwing up the new value in cache
record.write({
'line_ids': [Command.delete(record.line_ids.id), Command.create({'subtotal': 1.0})],
})
check(1.0)
def test_20_monetary_related(self):
""" test value rounding with related currency """
currency = self.env.ref('base.USD')
monetary_base = self.env['test_new_api.monetary_base'].create({
'base_currency_id': currency.id
})
monetary_related = self.env['test_new_api.monetary_related'].create({
'monetary_id': monetary_base.id,
'total': 1/3,
})
self.env.cr.execute(
"SELECT total FROM test_new_api_monetary_related WHERE id=%s",
monetary_related.ids,
)
[total] = self.env.cr.fetchone()
self.assertEqual(total, .33)
def test_20_like(self):
""" test filtered_domain() on char fields. """
record = self.env['test_new_api.multi.tag'].create({'name': 'Foo'})
self.assertTrue(record.filtered_domain([('name', 'like', 'F')]))
self.assertTrue(record.filtered_domain([('name', 'ilike', 'f')]))
record.name = 'Bar'
self.assertFalse(record.filtered_domain([('name', 'like', 'F')]))
self.assertFalse(record.filtered_domain([('name', 'ilike', 'f')]))
record.name = False
self.assertFalse(record.filtered_domain([('name', 'like', 'F')]))
self.assertFalse(record.filtered_domain([('name', 'ilike', 'f')]))
def test_21_date(self):
""" test date fields """
record = self.env['test_new_api.mixed'].create({})
# one may assign False or None
record.date = None
self.assertFalse(record.date)
# one may assign date but not datetime objects
record.date = date(2012, 5, 1)
self.assertEqual(record.date, date(2012, 5, 1))
# DLE P41: We now support to assign datetime to date. Not sure this is the good practice though.
# with self.assertRaises(TypeError):
# record.date = datetime(2012, 5, 1, 10, 45, 0)
# one may assign dates and datetime in the default format, and it must be checked
record.date = '2012-05-01'
self.assertEqual(record.date, date(2012, 5, 1))
record.date = "2012-05-01 10:45:00"
self.assertEqual(record.date, date(2012, 5, 1))
with self.assertRaises(ValueError):
record.date = '12-5-1'
# check filtered_domain
self.assertTrue(record.filtered_domain([('date', '<', '2012-05-02')]))
self.assertTrue(record.filtered_domain([('date', '<', date(2012, 5, 2))]))
self.assertTrue(record.filtered_domain([('date', '<', datetime(2012, 5, 2, 12, 0, 0))]))
self.assertTrue(record.filtered_domain([('date', '!=', False)]))
self.assertFalse(record.filtered_domain([('date', '=', False)]))
record.date = None
self.assertFalse(record.filtered_domain([('date', '<', '2012-05-02')]))
self.assertFalse(record.filtered_domain([('date', '<', date(2012, 5, 2))]))
self.assertFalse(record.filtered_domain([('date', '<', datetime(2012, 5, 2, 12, 0, 0))]))
self.assertFalse(record.filtered_domain([('date', '!=', False)]))
self.assertTrue(record.filtered_domain([('date', '=', False)]))
def test_21_datetime(self):
""" test datetime fields """
for i in range(0, 10):
self.assertEqual(fields.Datetime.now().microsecond, 0)
record = self.env['test_new_api.mixed'].create({})
# assign falsy value
record.moment = None
self.assertFalse(record.moment)
# assign string
record.moment = '2012-05-01'
self.assertEqual(record.moment, datetime(2012, 5, 1))
record.moment = '2012-05-01 06:00:00'
self.assertEqual(record.moment, datetime(2012, 5, 1, 6))
with self.assertRaises(ValueError):
record.moment = '12-5-1'
# assign date or datetime
record.moment = date(2012, 5, 1)
self.assertEqual(record.moment, datetime(2012, 5, 1))
record.moment = datetime(2012, 5, 1, 6)
self.assertEqual(record.moment, datetime(2012, 5, 1, 6))
# check filtered_domain
self.assertTrue(record.filtered_domain([('moment', '<', '2012-05-02')]))
self.assertTrue(record.filtered_domain([('moment', '<', date(2012, 5, 2))]))
self.assertTrue(record.filtered_domain([('moment', '<', datetime(2012, 5, 1, 12, 0, 0))]))
self.assertTrue(record.filtered_domain([('moment', '!=', False)]))
self.assertFalse(record.filtered_domain([('moment', '=', False)]))
record.moment = None
self.assertFalse(record.filtered_domain([('moment', '<', '2012-05-02')]))
self.assertFalse(record.filtered_domain([('moment', '<', date(2012, 5, 2))]))
self.assertFalse(record.filtered_domain([('moment', '<', datetime(2012, 5, 2, 12, 0, 0))]))
self.assertFalse(record.filtered_domain([('moment', '!=', False)]))
self.assertTrue(record.filtered_domain([('moment', '=', False)]))
def test_21_date_datetime_helpers(self):
""" test date/datetime fields helpers """
_date = fields.Date.from_string("2077-10-23")
_datetime = fields.Datetime.from_string("2077-10-23 09:42:00")
# addition
self.assertEqual(add(_date, days=5), date(2077, 10, 28))
self.assertEqual(add(_datetime, seconds=10), datetime(2077, 10, 23, 9, 42, 10))
# subtraction
self.assertEqual(subtract(_date, months=1), date(2077, 9, 23))
self.assertEqual(subtract(_datetime, hours=2), datetime(2077, 10, 23, 7, 42, 0))
# start_of
# year
self.assertEqual(start_of(_date, 'year'), date(2077, 1, 1))
self.assertEqual(start_of(_datetime, 'year'), datetime(2077, 1, 1))
# quarter
q1 = date(2077, 1, 1)
q2 = date(2077, 4, 1)
q3 = date(2077, 7, 1)
q4 = date(2077, 10, 1)
self.assertEqual(start_of(_date.replace(month=3), 'quarter'), q1)
self.assertEqual(start_of(_date.replace(month=5), 'quarter'), q2)
self.assertEqual(start_of(_date.replace(month=7), 'quarter'), q3)
self.assertEqual(start_of(_date, 'quarter'), q4)
self.assertEqual(start_of(_datetime, 'quarter'), datetime.combine(q4, time.min))
# month
self.assertEqual(start_of(_date, 'month'), date(2077, 10, 1))
self.assertEqual(start_of(_datetime, 'month'), datetime(2077, 10, 1))
# week
self.assertEqual(start_of(_date, 'week'), date(2077, 10, 18))
self.assertEqual(start_of(_datetime, 'week'), datetime(2077, 10, 18))
# day
self.assertEqual(start_of(_date, 'day'), _date)
self.assertEqual(start_of(_datetime, 'day'), _datetime.replace(hour=0, minute=0, second=0))
# hour
with self.assertRaises(ValueError):
start_of(_date, 'hour')
self.assertEqual(start_of(_datetime, 'hour'), _datetime.replace(minute=0, second=0))
# invalid
with self.assertRaises(ValueError):
start_of(_datetime, 'poop')
# end_of
# year
self.assertEqual(end_of(_date, 'year'), _date.replace(month=12, day=31))
self.assertEqual(end_of(_datetime, 'year'),
datetime.combine(_date.replace(month=12, day=31), time.max))
# quarter
q1 = date(2077, 3, 31)
q2 = date(2077, 6, 30)
q3 = date(2077, 9, 30)
q4 = date(2077, 12, 31)
self.assertEqual(end_of(_date.replace(month=2), 'quarter'), q1)
self.assertEqual(end_of(_date.replace(month=4), 'quarter'), q2)
self.assertEqual(end_of(_date.replace(month=9), 'quarter'), q3)
self.assertEqual(end_of(_date, 'quarter'), q4)
self.assertEqual(end_of(_datetime, 'quarter'), datetime.combine(q4, time.max))
# month
self.assertEqual(end_of(_date, 'month'), _date.replace(day=31))
self.assertEqual(end_of(_datetime, 'month'),
datetime.combine(date(2077, 10, 31), time.max))
# week
self.assertEqual(end_of(_date, 'week'), date(2077, 10, 24))
self.assertEqual(end_of(_datetime, 'week'),
datetime.combine(datetime(2077, 10, 24), time.max))
# day
self.assertEqual(end_of(_date, 'day'), _date)
self.assertEqual(end_of(_datetime, 'day'), datetime.combine(_datetime, time.max))
# hour
with self.assertRaises(ValueError):
end_of(_date, 'hour')
self.assertEqual(end_of(_datetime, 'hour'),
datetime.combine(_datetime, time.max).replace(hour=_datetime.hour))
# invalid
with self.assertRaises(ValueError):
end_of(_datetime, 'crap')
def test_22_selection(self):
""" test selection fields """
record_list = self.env['test_new_api.selection'].create({})
self.assertIsInstance(record_list._fields['state'].selection, list)
# the following selection is defined by a callable (method name)
record_call = self.env['test_new_api.mixed'].create({})
self.assertIsInstance(record_call._fields['lang'].selection, str)
# one may assign a value
record_list.state = 'foo'
record_call.lang = self.env['res.lang'].search([], limit=1).code
# one may assign False or None
record_list.state = None
self.assertFalse(record_list.state)
record_call.lang = None
self.assertFalse(record_call.lang)
# the assigned value is only checked for the list case
with self.assertRaises(ValueError):
record_list.state = 'zz_ZZ'
record_call.lang = 'zz_ZZ'
def test_23_relation(self):
""" test relation fields """
demo = self.user_demo
message = self.env.ref('test_new_api.message_0_0')
# check environment of record and related records
self.assertEqual(message.env, self.env)
self.assertEqual(message.discussion.env, self.env)
demo_env = self.env(user=demo)
self.assertNotEqual(demo_env, self.env)
# check environment of record and related records
self.assertEqual(message.env, self.env)
self.assertEqual(message.discussion.env, self.env)
# "migrate" message into demo_env, and check again
demo_message = message.with_user(demo)
self.assertEqual(demo_message.env, demo_env)
self.assertEqual(demo_message.discussion.env, demo_env)
# See YTI FIXME
self.env.invalidate_all()
# assign record's parent to a record in demo_env
message.discussion = message.discussion.copy({'name': 'Copy'})
# both message and its parent field must be in self.env
self.assertEqual(message.env, self.env)
self.assertEqual(message.discussion.env, self.env)
def test_24_reference(self):
""" test reference fields. """
record = self.env['test_new_api.mixed'].create({})
# one may assign False or None
record.reference = None
self.assertFalse(record.reference)
# one may assign a user or a partner...
record.reference = self.env.user
self.assertEqual(record.reference, self.env.user)
record.reference = self.env.user.partner_id
self.assertEqual(record.reference, self.env.user.partner_id)
# ... but no record from a model that starts with 'ir.'
with self.assertRaises(ValueError):
record.reference = self.env['ir.model'].search([], limit=1)
def test_25_related(self):
""" test related fields. """
message = self.env.ref('test_new_api.message_0_0')
discussion = message.discussion
# by default related fields are not stored
field = message._fields['discussion_name']
self.assertFalse(field.store)
self.assertFalse(field.readonly)
# check value of related field
self.assertEqual(message.discussion_name, discussion.name)
# change discussion name, and check result
discussion.name = 'Foo'
self.assertEqual(message.discussion_name, 'Foo')
# change discussion name via related field, and check result
message.discussion_name = 'Bar'
self.assertEqual(discussion.name, 'Bar')
self.assertEqual(message.discussion_name, 'Bar')
# change discussion name via related field on several records
discussion1 = discussion.create({'name': 'X1'})
discussion2 = discussion.create({'name': 'X2'})
discussion1.participants = discussion2.participants = self.env.user
message1 = message.create({'discussion': discussion1.id})
message2 = message.create({'discussion': discussion2.id})
self.assertEqual(message1.discussion_name, 'X1')
self.assertEqual(message2.discussion_name, 'X2')
(message1 + message2).write({'discussion_name': 'X3'})
self.assertEqual(discussion1.name, 'X3')
self.assertEqual(discussion2.name, 'X3')
# search on related field, and check result
search_on_related = self.env['test_new_api.message'].search([('discussion_name', '=', 'Bar')])
search_on_regular = self.env['test_new_api.message'].search([('discussion.name', '=', 'Bar')])
self.assertEqual(search_on_related, search_on_regular)
# check that field attributes are copied
message_field = message.fields_get(['discussion_name'])['discussion_name']
discussion_field = discussion.fields_get(['name'])['name']
self.assertEqual(message_field['help'], discussion_field['help'])
def test_25_related_attributes(self):
""" test the attributes of related fields """
text = self.registry['test_new_api.foo'].text
self.assertFalse(text.trim, "The target field is defined with trim=False")
# trim=True is the default on the field's class
self.assertTrue(type(text).trim, "By default, a Char field has trim=True")
# the parameter 'trim' is not set in text1's definition, so the field
# retrieves its value from text.trim
text1 = self.registry['test_new_api.bar'].text1
self.assertFalse(text1.trim, "The related field retrieves trim=False from target")
# text2 is defined with trim=True, so it should get that value
text2 = self.registry['test_new_api.bar'].text2
self.assertTrue(text2.trim, "The related field was defined with trim=True")
def test_25_related_single(self):
""" test related fields with a single field in the path. """
record = self.env['test_new_api.related'].create({'name': 'A'})
self.assertEqual(record.related_name, record.name)
self.assertEqual(record.related_related_name, record.name)
# check searching on related fields
records0 = self._search(record, [('name', '=', 'A')])
self.assertIn(record, records0)
records1 = self._search(record, [('related_name', '=', 'A')])
self.assertEqual(records1, records0)
records2 = self._search(record, [('related_related_name', '=', 'A')])
self.assertEqual(records2, records0)
# check writing on related fields
record.write({'related_name': 'B'})
self.assertEqual(record.name, 'B')
record.write({'related_related_name': 'C'})
self.assertEqual(record.name, 'C')
def test_25_related_multi(self):
""" test write() on several related fields based on a common computed field. """
foo = self.env['test_new_api.foo'].create({'name': 'A', 'value1': 1, 'value2': 2})
oof = self.env['test_new_api.foo'].create({'name': 'B', 'value1': 1, 'value2': 2})
bar = self.env['test_new_api.bar'].create({'name': 'A'})
self.assertEqual(bar.foo, foo)
self.assertEqual(bar.value1, 1)
self.assertEqual(bar.value2, 2)
self.env.invalidate_all()
bar.write({'value1': 3, 'value2': 4})
self.assertEqual(foo.value1, 3)
self.assertEqual(foo.value2, 4)
# modify 'name', and search on 'foo': this should flush 'name'
bar.name = 'B'
self.assertEqual(bar.foo, oof)
self.assertIn(bar, bar.search([('foo', 'in', oof.ids)]))
def test_25_one2many_inverse_related(self):
left = self.env['test_new_api.trigger.left'].create({})
right = self.env['test_new_api.trigger.right'].create({})
self.assertFalse(left.right_id)
self.assertFalse(right.left_ids)
self.assertFalse(right.left_size)
# create middle: this should trigger left.right_id by traversing
# middle.left_id, and right.left_size by traversing left.right_id
# after its computation!
middle = self.env['test_new_api.trigger.middle'].create({
'left_id': left.id,
'right_id': right.id,
})
self.assertEqual(left.right_id, right)
self.assertEqual(right.left_ids, left)
self.assertEqual(right.left_size, 1)
# delete middle: this should trigger left.right_id by traversing
# middle.left_id, and right.left_size by traversing left.right_id
# before its computation!
middle.unlink()
self.assertFalse(left.right_id)
self.assertFalse(right.left_ids)
self.assertFalse(right.left_size)
def test_26_inherited(self):
""" test inherited fields. """
# a bunch of fields are inherited from res_partner
for user in self.env['res.users'].search([]):
partner = user.partner_id
for field in ('is_company', 'name', 'email', 'country_id'):
self.assertEqual(getattr(user, field), getattr(partner, field))
self.assertEqual(user[field], partner[field])
def test_27_company_dependent(self):
""" test company-dependent fields. """
# Company-dependent field variants should handle 0, '' and NULL database values
# in the same way as their 'normal' (non-company-dependent) variants.
# This section relies on there being no company defaults, so it needs to run first.
null_record = self.env['test_new_api.company'].create({})
null_record_normal = self.env['test_new_api.mixed'].create({})
null_record.invalidate_recordset()
null_record_normal.invalidate_recordset()
field_correspondence = [
('foo', 'foo', ''),
('text', 'text', ''),
('date', 'date', False),
('moment', 'moment', False),
('truth', 'truth', False),
('count', 'count', 0),
('phi', 'number2', 0.0),
('html1', 'comment1', ''),
]
# Check null values
for field, normal_field, value_to_write in field_correspondence:
self.assertEqual(null_record[field], null_record_normal[normal_field])
null_record[field] = null_record_normal[normal_field] = value_to_write
# Check empty / 0 values
null_record.invalidate_recordset()
null_record_normal.invalidate_recordset()
for field, normal_field, _ in field_correspondence:
self.assertEqual(null_record[field], null_record_normal[normal_field])
# consider three companies
company0 = self.env.ref('base.main_company')
company1 = self.env['res.company'].create({'name': 'A'})
company2 = self.env['res.company'].create({'name': 'B'})
# create one user per company
user0 = self.env['res.users'].create({
'name': 'Foo', 'login': 'foo', 'company_id': company0.id,
'company_ids': [Command.set([company0.id, company1.id, company2.id])]})
user1 = self.env['res.users'].create({
'name': 'Bar', 'login': 'bar', 'company_id': company1.id,
'company_ids': [Command.set([company0.id, company1.id, company2.id])]})
user2 = self.env['res.users'].create({
'name': 'Baz', 'login': 'baz', 'company_id': company2.id,
'company_ids': [Command.set([company0.id, company1.id, company2.id])]})
# create values for many2one field
tag0 = self.env['test_new_api.multi.tag'].create({'name': 'Qux'})
tag1 = self.env['test_new_api.multi.tag'].create({'name': 'Quux'})
tag2 = self.env['test_new_api.multi.tag'].create({'name': 'Quuz'})
# create default values for the company-dependent fields
self.env['ir.default'].set('test_new_api.company', 'foo', 'default')
self.env['ir.default'].set('test_new_api.company', 'foo', 'default1', company_id=company1.id)
self.env['ir.default'].set('test_new_api.company', 'tag_id', tag0.id)
# assumption: users don't have access to 'ir.default'
accesses = self.env['ir.model.access'].search([('model_id.model', '=', 'ir.default')])
accesses.write(dict.fromkeys(['perm_read', 'perm_write', 'perm_create', 'perm_unlink'], False))
# create/modify a record, and check the value for each user
record = self.env['test_new_api.company'].create({
'foo': 'main',
'date': '1932-11-09',
'moment': '1932-11-09 00:00:00',
'tag_id': tag1.id,
})
self.assertEqual(record.with_user(user0).foo, 'main')
self.assertEqual(record.with_user(user1).foo, 'default1')
self.assertEqual(record.with_user(user2).foo, 'default')
self.assertEqual(str(record.with_user(user0).date), '1932-11-09')
self.assertEqual(record.with_user(user1).date, False)
self.assertEqual(record.with_user(user2).date, False)
self.assertEqual(str(record.with_user(user0).moment), '1932-11-09 00:00:00')
self.assertEqual(record.with_user(user1).moment, False)
self.assertEqual(record.with_user(user2).moment, False)
self.assertEqual(record.with_user(user0).tag_id, tag1)
self.assertEqual(record.with_user(user1).tag_id, tag0)
self.assertEqual(record.with_user(user2).tag_id, tag0)
record.with_user(user1).write({
'foo': 'alpha',
'date': '1932-12-10',
'moment': '1932-12-10 23:59:59',
'tag_id': tag2.id,
})
self.assertEqual(record.with_user(user0).foo, 'main')
self.assertEqual(record.with_user(user1).foo, 'alpha')
self.assertEqual(record.with_user(user2).foo, 'default')
self.assertEqual(str(record.with_user(user0).date), '1932-11-09')
self.assertEqual(str(record.with_user(user1).date), '1932-12-10')
self.assertEqual(record.with_user(user2).date, False)
self.assertEqual(str(record.with_user(user0).moment), '1932-11-09 00:00:00')
self.assertEqual(str(record.with_user(user1).moment), '1932-12-10 23:59:59')
self.assertEqual(record.with_user(user2).moment, False)
self.assertEqual(record.with_user(user0).tag_id, tag1)
self.assertEqual(record.with_user(user1).tag_id, tag2)
self.assertEqual(record.with_user(user2).tag_id, tag0)
# regression: duplicated records caused values to be browse(browse(id))
recs = record.create({}) + record + record
self.env.invalidate_all()
for rec in recs.with_user(user0):
self.assertIsInstance(rec.tag_id.id, int)
# unlink value of a many2one (tag2), and check again
tag2.unlink()
self.assertEqual(record.with_user(user0).tag_id, tag1)
self.assertEqual(record.with_user(user1).tag_id, tag0.browse())
self.assertEqual(record.with_user(user2).tag_id, tag0)
record.with_user(user1).foo = False
self.assertEqual(record.with_user(user0).foo, 'main')
self.assertEqual(record.with_user(user1).foo, False)
self.assertEqual(record.with_user(user2).foo, 'default')
record.with_user(user0).with_company(company1).foo = 'beta'
self.env.invalidate_all()
self.assertEqual(record.with_user(user0).foo, 'main')
self.assertEqual(record.with_user(user1).foo, 'beta')
self.assertEqual(record.with_user(user2).foo, 'default')
# add group on company-dependent field
self.assertFalse(user0.has_group('base.group_system'))
self.patch(type(record).foo, 'groups', 'base.group_system')
with self.assertRaises(AccessError):
record.with_user(user0).foo = 'forbidden'
user0.write({'groups_id': [Command.link(self.env.ref('base.group_system').id)]})
record.with_user(user0).foo = 'yes we can'
# add ir.rule to prevent access on record
self.assertTrue(user0._is_internal())
rule = self.env['ir.rule'].create({
'model_id': self.env['ir.model']._get_id(record._name),
'groups': [self.env.ref('base.group_user').id],
'domain_force': str([('id', '!=', record.id)]),
})
with self.assertRaises(AccessError):
record.with_user(user0).foo = 'forbidden'
# create company record and attribute
company_record = self.env['test_new_api.company'].create({'foo': 'ABC'})
attribute_record = self.env['test_new_api.company.attr'].create({
'company': company_record.id,
'quantity': 1,
})
self.assertEqual(attribute_record.bar, 'ABC')
# change quantity, 'bar' should recompute to 'ABCABC'
attribute_record.quantity = 2
self.assertEqual(attribute_record.bar, 'ABCABC')
# change company field 'foo', 'bar' should recompute to 'DEFDEF'
company_record.foo = 'DEF'
self.assertEqual(attribute_record.company.foo, 'DEF')
self.assertEqual(attribute_record.bar, 'DEFDEF')
# a low priviledge user should be able to search on company_dependent fields
company_record.env.user.groups_id -= self.env.ref('base.group_system')
self.assertFalse(company_record.env.user.has_group('base.group_system'))
company_records = self.env['test_new_api.company'].search([('foo', '=', 'DEF')])
self.assertEqual(len(company_records), 1)
def test_27_company_dependent_bool_integer_float(self):
company0 = self.env.ref('base.main_company')
company1 = self.env['res.company'].create({'name': 'A'})
Model = self.env['test_new_api.company']
record = Model.create({})
record.invalidate_recordset()
cr = self.env.cr
cr.execute("SELECT truth, count, phi FROM test_new_api_company WHERE id = %s", (record.id,))
self.assertEqual(cr.fetchone(), (None, None, None))
for company in [company0, company1]:
record_company = record.with_company(company)
self.assertEqual(record_company.truth, False)
self.assertEqual(record_company.count, 0)
self.assertEqual(record_company.phi, 0.0)
record.write({'truth': False, 'count': 0, 'phi': 0}) # write fallback equivalent
record.invalidate_recordset()
cr.execute("SELECT truth, count, phi FROM test_new_api_company WHERE id = %s", (record.id,))
self.assertEqual(cr.fetchone(), (None, None, None))
# NULL doesn't block read
cr.execute("UPDATE test_new_api_company SET truth = %s, count = %s, phi = %s WHERE id = %s", (
json.dumps({str(company0.id): None}),
json.dumps({str(company0.id): None}),
json.dumps({str(company0.id): None}),
record.id,
))
for company in [company0, company1]:
record_company = record.with_company(company)
self.assertEqual(record_company.truth, False)
self.assertEqual(record_company.count, 0)
self.assertEqual(record_company.phi, 0.0)
def test_28_company_dependent_search(self):
""" Test the search on company-dependent fields in all corner cases.
This assumes that filtered_domain() correctly filters records when
its domain refers to company-dependent fields.
"""
IrDefault = self.env['ir.default']
Model = self.env['test_new_api.company']
# create 4 records for all cases: two with explicit truthy values, one
# with an explicit falsy value, and one without an explicit value
records = Model.create([{}] * 4)
record_fallback = Model.create({})
# For each field, we assign values to the records, and test a number of
# searches. The search cases are given by comparison operators, and for
# each operator, we test a number of possible operands. Every search()
# returns a subset of the records, and we compare it to an equivalent
# search performed by filtered_domain().
def test_field(field_name, truthy_values, operations):
# set ir.defaults to all records except the last one
for rec, val in zip(records, truthy_values + [False]):
rec[field_name] = val
# test without default value
test_cases(field_name, operations)
# set default value to False
IrDefault.set(Model._name, field_name, False)
self.env.flush_all()
self.env.invalidate_all()
for rec, val in zip(records, truthy_values + [False]):
rec[field_name] = val
test_cases(field_name, operations, False)
# set default value to truthy_values[0]
IrDefault.set(Model._name, field_name, truthy_values[0])
self.env.flush_all()
self.env.invalidate_all()
for rec, val in zip(records, truthy_values + [False]):
rec[field_name] = val
test_cases(field_name, operations, truthy_values[0])
def test_cases(field_name, operations, default=None):
model = self.env['test_new_api.company']
field = model._fields[field_name]
field_fallback = field.get_company_dependent_fallback(model)
record_fallback[field_name] = field_fallback
current_thread = threading.current_thread()
for operator, values in operations.items():
for value in values:
domain = [(field_name, operator, value)]
company_dependent_column_not_null = not record_fallback.filtered_domain(domain)
if company_dependent_column_not_null:
with self.subTest(domain=domain, default=default):
Model.search([('id', 'in', records.ids)] + domain)
current_thread.query_count = 0
current_thread.query_time = 0
Model.search([('id', 'in', records.ids)] + domain) # warmup
if current_thread.query_count:
# parent_of and child_of may need extra queries
expected_contained_sqls = [''] * (current_thread.query_count - 1) + [f'"test_new_api_company"."{field_name}" IS NOT NULL']
with self.assertQueriesContain(expected_contained_sqls):
Model.search([('id', 'in', records.ids)] + domain)
# TODO complement of dates, child_of and parent_of are not working correctly, skip for now
test_complement = "date" not in field.type and operator not in ['child_of', 'parent_of']
with self.subTest(domain=domain, default=default):
self._search(
Model,
[('id', 'in', records.ids)] + domain,
[('id', 'in', records.ids)],
test_complement=test_complement,
)
# boolean fields
test_field('truth', [True, True], {
'=': (True, False),
'!=': (True, False),
})
# integer fields
test_field('count', [10, -2], {
'=': (10, -2, 0, False),
'!=': (10, -2, 0, False),
'<': (10, -2, 0),
'>=': (10, -2, 0),
'<=': (10, -2, 0),
'>': (10, -2, 0),
})
# float fields
test_field('phi', [1.61803, -1], {
'=': (1.61803, -1, 0, False),
'!=': (1.61803, -1, 0, False),
'<': (1.61803, -1, 0),
'>=': (1.61803, -1, 0),
'<=': (1.61803, -1, 0),
'>': (1.61803, -1, 0),
})
# char fields
test_field('foo', ['qwer', 'azer'], {
'like': ('qwer', 'azer'),
'ilike': ('qwer', 'azer'),
'not like': ('qwer', 'azer'),
'not ilike': ('qwer', 'azer'),
'=': ('qwer', 'azer', False),
'!=': ('qwer', 'azer', False),
'not in': (['qwer', 'azer'], ['qwer', False], [False], []),
'in': (['qwer', 'azer'], ['qwer', False], [False], []),
})
# date fields
date1, date2 = date(2021, 11, 22), date(2021, 11, 23)
test_field('date', [date1, date2], {
'=': (date1, date2, False),
'!=': (date1, date2, False),
'<': (date1, date2),
'>=': (date1, date2),
'<=': (date1, date2),
'>': (date1, date2),
})
# datetime fields
moment1, moment2 = datetime(2021, 11, 22), datetime(2021, 11, 23)
test_field('moment', [moment1, moment2], {
'=': (moment1, moment2, False),
'!=': (moment1, moment2, False),
'<': (moment1, moment2),
'>=': (moment1, moment2),
'<=': (moment1, moment2),
'>': (moment1, moment2),
})
# many2one fields
tag1, tag2 = self.env['test_new_api.multi.tag'].create([{'name': 'one'}, {'name': 'two'}])
test_field('tag_id', [tag1.id, tag2.id], {
'like': (tag1.name, tag2.name),
'ilike': (tag1.name, tag2.name),
'not like': (tag1.name, tag2.name),
'not ilike': (tag1.name, tag2.name),
'=': (tag1.id, tag2.id, False),
'!=': (tag1.id, tag2.id, False),
'in': ([tag1.id, tag2.id], [tag2.id, False], [False], []),
'not in': ([tag1.id, tag2.id], [tag2.id, False], [False], []),
'any': ([('name', '=', tag1.name)], [('name', '=', False)], []),
'not any': ([('name', '=', tag1.name)], [('name', '=', False)], []),
})
company0 = self.env.ref('base.main_company')
company1 = self.env['res.company'].create({'name': 'A1', 'parent_id': company0.id})
company2 = self.env['res.company'].create({'name': 'B1', 'parent_id': company1.id})
company1.partner_id.parent_id = company0.partner_id
company2.partner_id.parent_id = company1.partner_id
self.env.invalidate_all()
test_field('company_id', [company1.id, company2.id], {
'child_of': (company0.id, company1.id, company2.id),
'parent_of': (company0.id, company1.id, company2.id),
})
test_field('partner_id', [company1.id, company2.id], {
'child_of': (company0.partner_id.id, company1.partner_id.id, company2.partner_id.id),
'parent_of': (company0.partner_id.id, company1.partner_id.id, company2.partner_id.id),
})
def test_29_company_dependent_html(self):
company0 = self.env.ref('base.main_company')
company1 = self.env['res.company'].create({'name': 'A'})
company2 = self.env['res.company'].create({'name': 'B'})
user0 = self.env['res.users'].create({
'name': 'Foo', 'login': 'foo', 'company_id': company0.id,
'company_ids': [Command.set([company0.id, company1.id, company2.id])]})
user1 = self.env['res.users'].create({
'name': 'Bar', 'login': 'bar', 'company_id': company1.id,
'company_ids': [Command.set([company0.id, company1.id, company2.id])]})
user2 = self.env['res.users'].create({
'name': 'Baz', 'login': 'baz', 'company_id': company2.id,
'company_ids': [Command.set([company0.id, company1.id, company2.id])]})
some_ugly_html_0 = """
Oops this should maybe be sanitized
% if object.some_field and not object.oriented:
% if object.other_field:
${object.mako_thing}
|
This is some html.
% endif
%if object.dummy_field:
user0
%endif"""
some_ugly_html_1 = """Oops this should maybe be sanitized
% if object.some_field and not object.oriented:
% if object.other_field:
${object.mako_thing}
|
This is some html.
% endif
%if object.dummy_field:
user1
%endif"""
record = self.env['test_new_api.company'].create({
'html1': some_ugly_html_0,
'html2': some_ugly_html_0,
})
self.assertEqual(record.with_user(user0).html1, some_ugly_html_0, 'Error in HTML field: content was sanitized but field has sanitize=False')
self.assertEqual(record.with_user(user1).html1, False)
self.assertEqual(record.with_user(user2).html1, False)
# sanitize should have closed tags left open in the original html for user0
self.assertIn('
', record.with_user(user0).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True')
self.assertIn('', record.with_user(user0).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True')
self.assertNotIn('
', record.with_user(user1).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True')
self.assertIn('', record.with_user(user1).html2, 'Error in HTML field: content does not seem to have been sanitized despise sanitize=True')
self.assertNotIn('
.__get__(existing)
# -> records._fetch_field()
# -> records.fetch(['categories'])
# -> records.check_access('read')
# -> records._check_access('read')
# -> records.sudo().filtered_domain(...)
# -> .__get__(existing)
# -> records._fetch_field()
# -> records.fetch(['name', ...])
# -> ONE QUERY to read ['name', ...] of records
# -> ONE QUERY for deleted.exists() / code: forbidden = missing.exists()
# -> ONE QUERY for records.exists() / code: self = self.exists()
# -> ONE QUERY to read the many2many of existing
existing.categories
# this one must trigger a MissingError
with self.assertRaises(MissingError):
deleted.categories
# special case: should not fail
Discussion.browse([None]).read(['categories'])
def test_40_real_vs_new(self):
""" test field access on new records vs real records. """
Model = self.env['test_new_api.category']
real_record = Model.create({'name': 'Foo'})
new_origin = Model.new({'name': 'Bar'}, origin=real_record)
new_record = Model.new({'name': 'Baz'})
# non-computed non-stored field: default value
real_record = real_record.with_context(default_dummy='WTF')
new_origin = new_origin.with_context(default_dummy='WTF')
new_record = new_record.with_context(default_dummy='WTF')
self.assertEqual(real_record.dummy, 'WTF')
self.assertEqual(new_origin.dummy, 'WTF')
self.assertEqual(new_record.dummy, 'WTF')
# non-computed stored field: origin or default if no origin
real_record = real_record.with_context(default_color=42)
new_origin = new_origin.with_context(default_color=42)
new_record = new_record.with_context(default_color=42)
self.assertEqual(real_record.color, 0)
self.assertEqual(new_origin.color, 0)
self.assertEqual(new_record.color, 42)
# computed non-stored field: always computed
self.assertEqual(real_record.display_name, 'Foo')
self.assertEqual(new_origin.display_name, 'Bar')
self.assertEqual(new_record.display_name, 'Baz')
# computed stored field: origin or computed if no origin
Model = self.env['test_new_api.recursive']
real_record = Model.create({'name': 'Foo'})
new_origin = Model.new({'name': 'Bar'}, origin=real_record)
new_record = Model.new({'name': 'Baz'})
self.assertEqual(real_record.display_name, 'Foo')
self.assertEqual(new_origin.display_name, 'Bar')
self.assertEqual(new_record.display_name, 'Baz')
# computed stored field with recomputation: always computed
real_record.name = 'Fool'
new_origin.name = 'Barr'
new_record.name = 'Bazz'
self.assertEqual(real_record.display_name, 'Fool')
self.assertEqual(new_origin.display_name, 'Barr')
self.assertEqual(new_record.display_name, 'Bazz')
def test_40_new_defaults(self):
""" Test new records with defaults. """
user = self.env.user
discussion = self.env.ref('test_new_api.discussion_0')
# create a new message; fields have their default value if not given
new_msg = self.env['test_new_api.message'].new({'body': "XXX"})
self.assertFalse(new_msg.id)
self.assertEqual(new_msg.body, "XXX")
self.assertEqual(new_msg.author, user)
# assign some fields; should have no side effect
new_msg.discussion = discussion
new_msg.body = "YYY"
self.assertEqual(new_msg.discussion, discussion)
self.assertEqual(new_msg.body, "YYY")
self.assertNotIn(new_msg, discussion.messages)
# check computed values of fields
self.assertEqual(new_msg.name, "[%s] %s" % (discussion.name, user.name))
self.assertEqual(new_msg.size, 3)
# extra tests for x2many fields with default
cat1 = self.env['test_new_api.category'].create({'name': "Cat1"})
cat2 = self.env['test_new_api.category'].create({'name': "Cat2"})
discussion = discussion.with_context(default_categories=[Command.link(cat1.id)])
# no value gives the default value
new_disc = discussion.new({'name': "Foo"})
self.assertEqual(new_disc.categories._origin, cat1)
# value overrides default value
new_disc = discussion.new({'name': "Foo", 'categories': [Command.link(cat2.id)]})
self.assertEqual(new_disc.categories._origin, cat2)
def test_40_new_fields(self):
""" Test new records with relational fields. """
# create a new discussion with all kinds of relational fields
msg0 = self.env['test_new_api.message'].create({'body': "XXX"})
msg1 = self.env['test_new_api.message'].create({'body': "WWW"})
cat0 = self.env['test_new_api.category'].create({'name': 'AAA'})
cat1 = self.env['test_new_api.category'].create({'name': 'DDD'})
new_disc = self.env['test_new_api.discussion'].new({
'name': "Stuff",
'moderator': self.env.uid,
'messages': [
Command.link(msg0.id),
Command.link(msg1.id), Command.update(msg1.id, {'body': "YYY"}),
Command.create({'body': "ZZZ"})
],
'categories': [
Command.link(cat0.id),
Command.link(cat1.id), Command.update(cat1.id, {'name': "BBB"}),
Command.create({'name': "CCC"})
],
})
self.assertFalse(new_disc.id)
# many2one field values are actual records
self.assertEqual(new_disc.moderator.id, self.env.uid)
# x2many fields values are new records
new_msg0, new_msg1, new_msg2 = new_disc.messages
self.assertFalse(new_msg0.id)
self.assertFalse(new_msg1.id)
self.assertFalse(new_msg2.id)
new_cat0, new_cat1, new_cat2 = new_disc.categories
self.assertFalse(new_cat0.id)
self.assertFalse(new_cat1.id)
self.assertFalse(new_cat2.id)
# the x2many has its inverse field set
self.assertEqual(new_msg0.discussion, new_disc)
self.assertEqual(new_msg1.discussion, new_disc)
self.assertEqual(new_msg2.discussion, new_disc)
self.assertFalse(msg0.discussion)
self.assertFalse(msg1.discussion)
self.assertEqual(new_cat0.discussions, new_disc) # add other discussions
self.assertEqual(new_cat1.discussions, new_disc)
self.assertEqual(new_cat2.discussions, new_disc)
self.assertNotIn(new_disc, cat0.discussions)
self.assertNotIn(new_disc, cat1.discussions)
# new lines are connected to their origin
self.assertEqual(new_msg0._origin, msg0)
self.assertEqual(new_msg1._origin, msg1)
self.assertFalse(new_msg2._origin)
self.assertEqual(new_cat0._origin, cat0)
self.assertEqual(new_cat1._origin, cat1)
self.assertFalse(new_cat2._origin)
# the field values are either specific, or the same as the origin
self.assertEqual(new_msg0.body, "XXX")
self.assertEqual(new_msg1.body, "YYY")
self.assertEqual(new_msg2.body, "ZZZ")
self.assertEqual(msg0.body, "XXX")
self.assertEqual(msg1.body, "WWW")
self.assertEqual(new_cat0.name, "AAA")
self.assertEqual(new_cat1.name, "BBB")
self.assertEqual(new_cat2.name, "CCC")
self.assertEqual(cat0.name, "AAA")
self.assertEqual(cat1.name, "DDD")
# special case for many2one fields that define _inherits
new_email = self.env['test_new_api.emailmessage'].new({'body': "XXX"})
self.assertFalse(new_email.id)
self.assertTrue(new_email.message)
self.assertFalse(new_email.message.id)
self.assertEqual(new_email.body, "XXX")
new_email = self.env['test_new_api.emailmessage'].new({'message': msg0.id})
self.assertFalse(new_email.id)
self.assertFalse(new_email._origin)
self.assertFalse(new_email.message.id)
self.assertEqual(new_email.message._origin, msg0)
self.assertEqual(new_email.body, "XXX")
# check that this does not generate an infinite recursion
new_disc._convert_to_write(new_disc._cache)
def test_40_new_convert_to_write(self):
new_disc = self.env['test_new_api.discussion'].new({
'name': "Stuff",
'moderator': self.env.uid,
'participants': [(6, 0, self.env.user.ids)],
})
# Put the user groups in the cache of the new record
new_disc.participants.groups_id
# Check that the groups in the cache are not returned by convert_to_write
# because no real change happened, the values are identical except that
# self.env.user.groups_id._ids = (Id1, Id2, ...) whereas
# new_disc.participants.groups_id._ids = (NewId(origin=Id1), NewId(origin=Id2), ...)
field = new_disc._fields.get("participants")
# make sure that there is no inverse field for discussions on res_users,
# as the test depends on it
self.assertFalse(new_disc.pool.field_inverses[field])
convert = field.convert_to_write(new_disc["participants"], new_disc)
self.assertEqual(convert, [(6, 0, self.env.user.ids)])
def test_40_new_inherited_fields(self):
""" Test the behavior of new records with inherited fields. """
email = self.env['test_new_api.emailmessage'].new({'body': 'XXX'})
self.assertEqual(email.body, 'XXX')
self.assertEqual(email.message.body, 'XXX')
email.body = 'YYY'
self.assertEqual(email.body, 'YYY')
self.assertEqual(email.message.body, 'YYY')
email.message.body = 'ZZZ'
self.assertEqual(email.body, 'ZZZ')
self.assertEqual(email.message.body, 'ZZZ')
def test_40_new_ref_origin(self):
""" Test the behavior of new records with ref/origin. """
Discussion = self.env['test_new_api.discussion']
new = Discussion.new
# new records with identical/different refs
xs = new() + new(ref='a') + new(ref='b') + new(ref='b')
self.assertEqual([x == y for x in xs for y in xs], [
1, 0, 0, 0,
0, 1, 0, 0,
0, 0, 1, 1,
0, 0, 1, 1,
])
for x in xs:
self.assertFalse(x._origin)
# new records with identical/different origins
a, b = Discussion.create([{'name': "A"}, {'name': "B"}])
xs = new() + new(origin=a) + new(origin=b) + new(origin=b)
self.assertEqual([x == y for x in xs for y in xs], [
1, 0, 0, 0,
0, 1, 0, 0,
0, 0, 1, 1,
0, 0, 1, 1,
])
self.assertFalse(xs[0]._origin)
self.assertEqual(xs[1]._origin, a)
self.assertEqual(xs[2]._origin, b)
self.assertEqual(xs[3]._origin, b)
self.assertEqual(xs._origin, a + b + b)
self.assertEqual(xs._origin._origin, a + b + b)
# new records with refs and origins
x1 = new(ref='a')
x2 = new(origin=b)
self.assertNotEqual(x1, x2)
# new discussion based on existing discussion
disc = self.env.ref('test_new_api.discussion_0')
new_disc = disc.new(origin=disc)
self.assertFalse(new_disc.id)
self.assertEqual(new_disc._origin, disc)
self.assertEqual(new_disc.name, disc.name)
# many2one field
self.assertEqual(new_disc.moderator, disc.moderator)
# one2many field
self.assertTrue(new_disc.messages)
self.assertNotEqual(new_disc.messages, disc.messages)
self.assertEqual(new_disc.messages._origin, disc.messages)
# many2many field
self.assertTrue(new_disc.participants)
self.assertNotEqual(new_disc.participants, disc.participants)
self.assertEqual(new_disc.participants._origin, disc.participants)
# provide many2one field as a dict of values; the value is a new record
# with the given 'id' as origin (if given, of course)
new_msg = disc.messages.new({
'discussion': {'name': disc.name},
})
self.assertTrue(new_msg.discussion)
self.assertFalse(new_msg.discussion.id)
self.assertFalse(new_msg.discussion._origin)
new_msg = disc.messages.new({
'discussion': {'name': disc.name, 'id': disc.id},
})
self.assertTrue(new_msg.discussion)
self.assertFalse(new_msg.discussion.id)
self.assertEqual(new_msg.discussion._origin, disc)
# check convert_to_write
tag = self.env['test_new_api.multi.tag'].create({'name': 'Foo'})
rec = self.env['test_new_api.multi'].create({
'lines': [(0, 0, {'tags': [(6, 0, tag.ids)]})],
})
new = rec.new(origin=rec)
self.assertEqual(new.lines.tags._origin, rec.lines.tags)
vals = new._convert_to_write(new._cache)
self.assertEqual(vals['lines'], [(6, 0, rec.lines.ids)])
def test_41_new_compute(self):
""" Check recomputation of fields on new records. """
move = self.env['test_new_api.move'].create({
'line_ids': [Command.create({'quantity': 1}), Command.create({'quantity': 1})],
})
self.env.flush_all()
line = move.line_ids[0]
new_move = move.new(origin=move)
new_line = line.new(origin=line)
# move_id is fetched from origin
self.assertEqual(new_line.move_id, move)
self.assertEqual(new_move.quantity, 2)
self.assertEqual(move.quantity, 2)
# modifying new_line must trigger recomputation on new_move, even if
# new_line.move_id is not new_move!
new_line.quantity = 2
self.assertEqual(new_line.move_id, move)
self.assertEqual(new_move.quantity, 3)
self.assertEqual(move.quantity, 2)
def test_41_new_one2many(self):
""" Check command on one2many field on new record. """
move = self.env['test_new_api.move'].create({})
line = self.env['test_new_api.move_line'].create({'move_id': move.id, 'quantity': 1})
self.env.flush_all()
new_move = move.new(origin=move)
new_line = line.new(origin=line)
self.assertEqual(new_move.line_ids, new_line)
# drop line, and create a new one
new_move.line_ids = [Command.delete(new_line.id), Command.create({'quantity': 2})]
self.assertEqual(len(new_move.line_ids), 1)
self.assertFalse(new_move.line_ids.id)
self.assertEqual(new_move.line_ids.quantity, 2)
# assign line to new move without origin
new_move = move.new()
new_move.line_ids = line
self.assertFalse(new_move.line_ids.id)
self.assertEqual(new_move.line_ids._origin, line)
self.assertEqual(new_move.line_ids.move_id, new_move)
def test_41_new_many2many(self):
group = self.env['test_new_api.group'].create({})
user0 = self.env['test_new_api.user'].create({'group_ids': [Command.link(group.id)]})
new_user0 = user0.new(origin=user0)
new_group = group.new(origin=group)
self.env.invalidate_all()
# creating new_user1 shoud not fetch new_group.user_ids, which is the
# inverse of field new_user1.group_ids
with self.assertQueryCount(0):
new_user1 = self.env['test_new_api.user'].new({'group_ids': [Command.link(group.id)]})
self.assertEqual(new_user1.group_ids, new_group)
# accessing new_group.user_ids should fetch group.user_ids and patch
# new_group.user_ids
with self.assertQueryCount(1):
self.assertEqual(new_group.user_ids, new_user0 + new_user1)
# creating new_user2 should patch new_group.user_ids immediately, since
# it is in cache
with self.assertQueryCount(0):
new_user2 = self.env['test_new_api.user'].new({'group_ids': [Command.link(group.id)]})
self.assertEqual(new_user2.group_ids, new_group)
self.assertEqual(new_group.user_ids, new_user0 + new_user1 + new_user2)
# the patches on new_group.user_ids should not have changed group.user_ids
self.assertEqual(group.user_ids, user0)
@mute_logger('odoo.addons.base.models.ir_model')
def test_41_new_related(self):
""" test the behavior of related fields starting on new records. """
# make discussions unreadable for demo user
access = self.env.ref('test_new_api.access_discussion')
access.write({'perm_read': False})
# create an environment for demo user
env = self.env(user=self.user_demo)
self.assertEqual(env.user.login, "demo")
# create a new message as demo user
discussion = self.env.ref('test_new_api.discussion_0')
message = env['test_new_api.message'].new({'discussion': discussion})
self.assertEqual(message.discussion, discussion)
# read the related field discussion_name
self.assertEqual(message.discussion.env, env)
self.assertEqual(message.discussion_name, discussion.name)
# DLE P75: message.discussion.name is put in the cache as sudo thanks to the computation of message.discussion_name
# As we decided that now if we had the chance to access the value at some point in the code, and that it was stored in the cache
# it's not a big deal to no longer raise the accesserror, as we had the chance to get the value at some point
# with self.assertRaises(AccessError):
# message.discussion.name
@mute_logger('odoo.addons.base.models.ir_model')
def test_42_new_related(self):
""" test the behavior of related fields traversing new records. """
# make discussions unreadable for demo user
access = self.env.ref('test_new_api.access_discussion')
access.write({'perm_read': False})
# create an environment for demo user
env = self.env(user=self.user_demo)
self.assertEqual(env.user.login, "demo")
# create a new discussion and a new message as demo user
discussion = env['test_new_api.discussion'].new({'name': 'Stuff'})
message = env['test_new_api.message'].new({'discussion': discussion})
self.assertEqual(message.discussion, discussion)
# read the related field discussion_name
self.assertNotEqual(message.sudo().env, message.env)
self.assertEqual(message.discussion_name, discussion.name)
def test_43_new_related(self):
""" test the behavior of one2many related fields """
partner = self.env['res.partner'].create({
'name': 'Foo',
'child_ids': [Command.create({'name': 'Bar'})],
})
multi = self.env['test_new_api.multi'].new()
multi.partner = partner
self.assertEqual(multi.partners.mapped('name'), ['Bar'])
def test_50_defaults(self):
""" test default values. """
fields = ['discussion', 'body', 'author', 'size']
defaults = self.env['test_new_api.message'].default_get(fields)
self.assertEqual(defaults, {'author': self.env.uid})
defaults = self.env['test_new_api.mixed'].default_get(['number'])
self.assertEqual(defaults, {'number': 3.14})
def test_50_search_many2one(self):
""" test search through a path of computed fields"""
messages = self.env['test_new_api.message'].search(
[('author_partner.name', '=', 'Marc Demo')])
self.assertEqual(messages, self.env.ref('test_new_api.message_0_1'))
def test_51_search_many2one_ordered(self):
""" test search on many2one ordered by id """
with self.assertQueries(['''
SELECT "test_new_api_message"."id" FROM "test_new_api_message"
WHERE ("test_new_api_message"."active" = %s)
ORDER BY "test_new_api_message"."discussion"
''']):
self.env['test_new_api.message'].search([], order='discussion')
def test_52_search_many2one_active_test(self):
Model = self.env['test_new_api.model_active_field']
active_parent = Model.create({'name': 'Parent'})
child_of_active = Model.create({'parent_id': active_parent.id})
inactive_parent = Model.create({'name': 'Parent', 'active': False})
child_of_inactive = Model.create({'parent_id': inactive_parent.id})
self.assertEqual(
self._search(Model, [('parent_id.name', '=', 'Parent')]),
child_of_active + child_of_inactive,
)
self.assertEqual(
self._search(Model, [('parent_id', '=', 'Parent')]),
child_of_active + child_of_inactive,
)
# weird semantics: active_parent is in both results but doesn't have a parent_id
self.assertEqual(
self._search(Model, [('parent_id', 'child_of', active_parent.id)]),
active_parent + child_of_active,
)
self.assertEqual(
self._search(Model, [('parent_id', 'child_of', 'Parent')]),
active_parent + child_of_active + child_of_inactive,
)
def test_60_one2many_domain(self):
""" test the cache consistency of a one2many field with a domain """
discussion = self.env.ref('test_new_api.discussion_0')
message = discussion.messages[0]
self.assertNotIn(message, discussion.important_messages)
message.important = True
self.assertIn(message, discussion.important_messages)
# writing on very_important_messages should call its domain method
self.assertIn(message, discussion.very_important_messages)
discussion.write({'very_important_messages': [Command.clear()]})
self.assertFalse(discussion.very_important_messages)
self.assertFalse(message.exists())
def test_60_many2many_domain(self):
""" test the cache consistency of a many2many field with a domain """
tag = self.env['test_new_api.multi.tag'].create({'name': 'bar'})
record = self.env['test_new_api.multi'].create({'tags': tag.ids})
self.env.flush_all()
self.env.invalidate_all()
self.assertEqual(type(record).tags.domain, [('name', 'ilike', 'a')])
# the tag is in the many2many
self.assertIn(tag, record.tags)
# modify the tag; it should not longer be in the many2many
tag.name = 'foo'
self.assertNotIn(tag, record.tags)
# modify again the tag; it should be back in the many2many
tag.name = 'baz'
self.assertIn(tag, record.tags)
def test_70_x2many_write(self):
discussion = self.env.ref('test_new_api.discussion_0')
# See YTI FIXME
self.env.invalidate_all()
Message = self.env['test_new_api.message']
# There must be 3 messages, 0 important
self.assertEqual(len(discussion.messages), 3)
self.assertEqual(len(discussion.important_messages), 0)
self.assertEqual(len(discussion.very_important_messages), 0)
discussion.important_messages = [Command.create({
'body': 'What is the answer?',
'important': True,
})]
# There must be 4 messages, 1 important
self.assertEqual(len(discussion.messages), 4)
self.assertEqual(len(discussion.important_messages), 1)
self.assertEqual(len(discussion.very_important_messages), 1)
discussion.very_important_messages |= Message.new({
'body': '42',
'important': True,
})
# There must be 5 messages, 2 important
self.assertEqual(len(discussion.messages), 5)
self.assertEqual(len(discussion.important_messages), 2)
self.assertEqual(len(discussion.very_important_messages), 2)
def test_70_relational_inverse(self):
""" Check the consistency of relational fields with inverse(s). """
discussion = self.env.ref('test_new_api.discussion_0')
demo_discussion = discussion.with_user(self.user_demo)
# check that the demo user sees the same messages
self.assertEqual(demo_discussion.messages, discussion.messages)
# See YTI FIXME
self.env.flush_all()
self.env.invalidate_all()
# add a message as user demo
messages = demo_discussion.messages
message = messages.create({'discussion': discussion.id})
self.assertEqual(demo_discussion.messages, messages + message)
self.assertEqual(demo_discussion.messages, discussion.messages)
# add a message as superuser
messages = discussion.messages
message = messages.create({'discussion': discussion.id})
self.assertEqual(discussion.messages, messages + message)
self.assertEqual(demo_discussion.messages, discussion.messages)
def test_71_relational_inverse(self):
""" Check the consistency of relational fields with inverse(s). """
move1 = self.env['test_new_api.move'].create({})
move2 = self.env['test_new_api.move'].create({})
line = self.env['test_new_api.move_line'].create({'move_id': move1.id})
self.env.flush_all()
self.env.invalidate_all()
line.with_context(prefetch_fields=False).move_id
# Setting 'move_id' updates the one2many field that is based on it,
# which has a domain. Here we check that evaluating the domain does not
# accidentally override 'move_id' (by prefetch).
line.move_id = move2
self.assertEqual(line.move_id, move2)
def test_72_relational_inverse(self):
""" Check the consistency of relational fields with inverse(s). """
move1 = self.env['test_new_api.move'].create({})
move2 = self.env['test_new_api.move'].create({})
# makes sure that line.move_id is flushed before search
line = self.env['test_new_api.move_line'].create({'move_id': move1.id})
moves = self.env['test_new_api.move'].search([('line_ids', 'in', line.id)])
self.assertEqual(moves, move1)
# makes sure that line.move_id is flushed before search
line.move_id = move2
moves = self.env['test_new_api.move'].search([('line_ids', 'in', line.id)])
self.assertEqual(moves, move2)
def test_73_relational_inverse(self):
""" Check the consistency of relational fields with inverse(s). """
discussion1, discussion2 = self.env['test_new_api.discussion'].create([
{'name': "discussion1"}, {'name': "discussion2"},
])
category1, category2 = self.env['test_new_api.category'].create([
{'name': "category1"}, {'name': "category2"},
])
# assumption: category12 and category21 are in different order, but are
# in the same order when put in a set()
category12 = category1 + category2
category21 = category2 + category1
self.assertNotEqual(category12.ids, category21.ids)
self.assertEqual(list(set(category12.ids)), list(set(category21.ids)))
# make sure discussion1.categories is in cache; the write() below should
# update the cache of discussion1.categories by appending category12.ids
discussion1.categories
category12.write({'discussions': [Command.link(discussion1.id)]})
self.assertEqual(discussion1.categories.ids, category12.ids)
# make sure discussion2.categories is in cache; the write() below should
# update the cache of discussion2.categories by appending category21.ids
discussion2.categories
category21.write({'discussions': [Command.link(discussion2.id)]})
self.assertEqual(discussion2.categories.ids, category21.ids)
def test_80_copy(self):
discussion = self.env.ref('test_new_api.discussion_0')
message = self.env.ref('test_new_api.message_0_0')
message1 = self.env.ref('test_new_api.message_0_1')
email = self.env.ref('test_new_api.emailmessage_0_0')
self.assertEqual(email.message, message)
self.env['res.lang']._activate_lang('fr_FR')
# set a translation for message.label
email.with_context(lang='fr_FR').label = "bonjour"
self.assertEqual(message.with_context(lang='fr_FR').label, 'bonjour')
self.assertFalse(message1.label)
# setting the parent record should not copy its translations
email.copy({'message': message1.id})
self.assertEqual(message.with_context(lang='fr_FR').label, 'bonjour')
self.assertFalse(message1.label)
# setting a one2many should not copy translations on the lines
discussion.copy({'messages': [Command.set(message1.ids)]})
self.assertEqual(message.with_context(lang='fr_FR').label, 'bonjour')
self.assertFalse(message1.label)
def test_85_binary_guess_zip(self):
from odoo.addons.base.tests.test_mimetypes import ZIP
# Regular ZIP files can be uploaded by non-admin users
self.env['test_new_api.binary_svg'].with_user(self.user_demo).create({
'name': 'Test without attachment',
'image_wo_attachment': base64.b64decode(ZIP),
})
def test_86_text_base64_guess_svg(self):
from odoo.addons.base.tests.test_mimetypes import SVG
with self.assertRaises(UserError) as e:
self.env['test_new_api.binary_svg'].with_user(self.user_demo).create({
'name': 'Test without attachment',
'image_wo_attachment': SVG.decode("utf-8"),
})
self.assertEqual(e.exception.args[0], 'Only admins can upload SVG files.')
def test_90_binary_svg(self):
from odoo.addons.base.tests.test_mimetypes import SVG
# This should work without problems
self.env['test_new_api.binary_svg'].create({
'name': 'Test without attachment',
'image_wo_attachment': SVG,
})
# And this gives error
with self.assertRaises(UserError):
self.env['test_new_api.binary_svg'].with_user(
self.user_demo,
).create({
'name': 'Test without attachment',
'image_wo_attachment': SVG,
})
def test_91_binary_svg_attachment(self):
from odoo.addons.base.tests.test_mimetypes import SVG
# This doesn't neuter SVG with admin
record = self.env['test_new_api.binary_svg'].create({
'name': 'Test without attachment',
'image_attachment': SVG,
})
attachment = self.env['ir.attachment'].search([
('res_model', '=', record._name),
('res_field', '=', 'image_attachment'),
('res_id', '=', record.id),
])
self.assertEqual(attachment.mimetype, 'image/svg+xml')
# ...but this should be neutered with demo user
record = self.env['test_new_api.binary_svg'].with_user(
self.user_demo,
).create({
'name': 'Test without attachment',
'image_attachment': SVG,
})
attachment = self.env['ir.attachment'].search([
('res_model', '=', record._name),
('res_field', '=', 'image_attachment'),
('res_id', '=', record.id),
])
self.assertEqual(attachment.mimetype, 'text/plain')
def test_92_binary_self_avatar_svg(self):
from odoo.addons.base.tests.test_mimetypes import SVG
demo_user = self.user_demo
# User demo changes his own avatar
demo_user.with_user(demo_user).image_1920 = SVG
# The SVG file should have been neutered
attachment = self.env['ir.attachment'].search([
('res_model', '=', demo_user.partner_id._name),
('res_field', '=', 'image_1920'),
('res_id', '=', demo_user.partner_id.id),
])
self.assertEqual(attachment.mimetype, 'text/plain')
def test_93_monetary_related(self):
""" Check the currency field on related monetary fields. """
# check base field
model = self.env['test_new_api.monetary_base']
field = model._fields['amount']
self.assertEqual(field.get_currency_field(model), 'base_currency_id')
# related fields must use the field 'currency_id' or 'x_currency_id'
model = self.env['test_new_api.monetary_related']
field = model._fields['amount']
self.assertEqual(field.related, 'monetary_id.amount')
self.assertEqual(field.get_currency_field(model), 'currency_id')
model = self.env['test_new_api.monetary_custom']
field = model._fields['x_amount']
self.assertEqual(field.related, 'monetary_id.amount')
self.assertEqual(field.get_currency_field(model), 'x_currency_id')
# inherited field must use the same field as its parent field
model = self.env['test_new_api.monetary_inherits']
field = model._fields['amount']
self.assertEqual(field.related, 'monetary_id.amount')
self.assertEqual(field.get_currency_field(model), 'base_currency_id')
def test_94_image(self):
f = io.BytesIO()
Image.new('RGB', (4000, 2000), '#4169E1').save(f, 'PNG')
f.seek(0)
image_w = base64.b64encode(f.read())
f = io.BytesIO()
Image.new('RGB', (2000, 4000), '#4169E1').save(f, 'PNG')
f.seek(0)
image_h = base64.b64encode(f.read())
record = self.env['test_new_api.model_image'].create({
'name': 'image',
'image': image_w,
'image_128': image_w,
})
# test create (no resize)
self.assertEqual(record.image, image_w)
# test create (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (128, 64))
# test create related store (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256))
# test create related no store (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128))
# test create related store on column (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32))
record.write({
'image': image_h,
'image_128': image_h,
})
# test write (no resize)
self.assertEqual(record.image, image_h)
# test write (resize, height limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (64, 128))
# test write related store (resize, height limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512))
# test write related no store (resize, height limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256))
# test write related store on column (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64))
record = self.env['test_new_api.model_image'].create({
'name': 'image',
'image': image_h,
'image_128': image_h,
})
# test create (no resize)
self.assertEqual(record.image, image_h)
# test create (resize, height limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (64, 128))
# test create related store (resize, height limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512))
# test create related no store (resize, height limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256))
# test create related store on column (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64))
record.write({
'image': image_w,
'image_128': image_w,
})
# test write (no resize)
self.assertEqual(record.image, image_w)
# test write (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_128))).size, (128, 64))
# test write related store (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256))
# test write related store (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128))
# test write related store on column (resize, width limited)
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32))
# test create inverse store
record = self.env['test_new_api.model_image'].create({
'name': 'image',
'image_512': image_w,
})
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (4000, 2000))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32))
# test write inverse store
record.write({
'image_512': image_h,
})
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (2000, 4000))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64))
# test create inverse no store
record = self.env['test_new_api.model_image'].with_context(image_no_postprocess=True).create({
'name': 'image',
'image_256': image_w,
})
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (4000, 2000))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32))
# test write inverse no store
record.write({
'image_256': image_h,
})
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (2000, 4000))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64))
# test create inverse stored column
record = self.env['test_new_api.model_image'].with_context(image_no_postprocess=True).create({
'name': 'image',
'image_64': image_w,
})
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (512, 256))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (4000, 2000))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (256, 128))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (64, 32))
# test write inverse stored column
record.write({
'image_64': image_h,
})
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_512))).size, (256, 512))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image))).size, (2000, 4000))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_256))).size, (128, 256))
self.assertEqual(Image.open(io.BytesIO(base64.b64decode(record.image_64))).size, (32, 64))
# test bin_size
record_bin_size = record.with_context(bin_size=True)
self.assertEqual(record_bin_size.image, b'31.54 Kb')
self.assertEqual(record_bin_size.image_512, b'1.02 Kb')
self.assertEqual(record_bin_size.image_256, b'424.00 bytes')
# non-attachment binary fields: value returned as str in a different
# form, because coming from PostgreSQL instead of filestore
self.assertEqual(record_bin_size.image_64, '148 bytes')
# ensure image_data_uri works (value must be bytes and not string)
self.assertEqual(record.image_256[:8], b'iVBORw0K')
self.assertEqual(image_data_uri(record.image_256)[:30], '')
# ensure invalid image raises
with self.assertRaises(UserError), self.cr.savepoint():
record.write({
'image': 'invalid image',
})
# assignment of invalid image on new record does nothing, the value is
# taken from origin instead (use-case: onchange)
new_record = record.new(origin=record)
new_record.image = '31.54 Kb'
self.assertEqual(record.image, image_h)
self.assertEqual(new_record.image, image_h)
# assignment to new record with origin should not do any query
with self.assertQueryCount(0):
new_record.image = image_w
def test_95_binary_bin_size_create(self):
binary_value = base64.b64encode(b'content')
binary_size = b'7.00 bytes'
def assertBinaryValue(record, value):
for field in ('binary', 'binary_related_store', 'binary_related_no_store'):
self.assertEqual(record[field], value, f'Incorrect result for {field}')
# created and first read without context
record = self.env['test_new_api.model_binary'].create({'binary': binary_value})
record_no_bin_size = record.with_context(bin_size=False)
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record_bin_size, binary_size)
# created and first read with bin_size=False
record_no_bin_size = self.env['test_new_api.model_binary'].with_context(bin_size=False).create({'binary': binary_value})
record = self.env['test_new_api.model_binary'].browse(record.id)
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_bin_size, binary_size)
# created and first read with bin_size=True
record_bin_size = self.env['test_new_api.model_binary'].with_context(bin_size=True).create({'binary': binary_value})
record = self.env['test_new_api.model_binary'].browse(record.id)
record_no_bin_size = record.with_context(bin_size=False)
assertBinaryValue(record_bin_size, binary_size)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record, binary_value)
# created without context and flushed/invalidated with bin_size=True
record = self.env['test_new_api.model_binary'].create({'binary': binary_value})
record.with_context(bin_size=True).env.invalidate_all()
record_no_bin_size = record.with_context(bin_size=False)
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record_bin_size, binary_size)
# check computed binary field with arbitrary Python value
record = self.env['test_new_api.model_binary'].create({})
record_no_bin_size = record.with_context(bin_size=False)
record_bin_size = record.with_context(bin_size=True)
expected_value = [(record.id, False)]
self.assertEqual(record.binary_computed, expected_value)
self.assertEqual(record_no_bin_size.binary_computed, expected_value)
self.assertEqual(record_bin_size.binary_computed, expected_value)
def test_95_binary_bin_size_write(self):
binary_value = base64.b64encode(b'content')
binary_size = b'7.00 bytes'
def assertBinaryValue(record, value):
for field in ('binary', 'binary_related_store', 'binary_related_no_store'):
self.assertEqual(record[field], value, f'Incorrect result for {field}')
# created and written without context
record = self.env['test_new_api.model_binary'].create({})
record.write({'binary': binary_value})
record_no_bin_size = record.with_context(bin_size=False)
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record_bin_size, binary_size)
# created without context, written with bin_size=False
record = self.env['test_new_api.model_binary'].create({})
record.with_context(bin_size=False).write({'binary': binary_value})
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_bin_size, binary_size)
# created without context, written with bin_size=True
record = self.env['test_new_api.model_binary'].create({})
record.with_context(bin_size=True).write({'binary': binary_value})
record_no_bin_size = record.with_context(bin_size=False)
assertBinaryValue(record_bin_size, binary_size)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record, binary_value)
# created without context and flushed with bin_size=True
record = self.env['test_new_api.model_binary'].create({})
record.write({'binary': binary_value})
record.with_context(bin_size=True).env.invalidate_all()
record_no_bin_size = record.with_context(bin_size=False)
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record_bin_size, binary_size)
# created and written without context, flushed without bin_size
record = self.env['test_new_api.model_binary'].create({})
record.write({'binary': binary_value})
record.env.invalidate_all()
record_no_bin_size = record.with_context(bin_size=False)
record_bin_size = record.with_context(bin_size=True)
assertBinaryValue(record, binary_value)
assertBinaryValue(record_no_bin_size, binary_value)
assertBinaryValue(record_bin_size, binary_size)
def test_96_order_m2o(self):
belgium, congo = self.env['test_new_api.country'].create([
{'name': "Duchy of Brabant"},
{'name': "Congo"},
])
cities = self.env['test_new_api.city'].create([
{'name': "Brussels", 'country_id': belgium.id},
{'name': "Kinshasa", 'country_id': congo.id},
])
# cities are sorted by country_id, name
self.assertEqual(cities.sorted().mapped('name'), ["Kinshasa", "Brussels"])
# change order of countries, and check sorted()
belgium.name = "Belgium"
self.assertEqual(cities.sorted().mapped('name'), ["Brussels", "Kinshasa"])
def test_97_ir_rule_m2m_field(self):
"""Ensures m2m fields can't be read if the left records can't be read.
Also makes sure reading m2m doesn't take more queries than necessary."""
tag = self.env['test_new_api.multi.tag'].create({})
record = self.env['test_new_api.multi.line'].create({
'name': 'image',
'tags': [Command.link(tag.id)],
})
# only one query as admin: reading pivot table
with self.assertQueryCount(1):
# trick: if value is in cache, read() does not make any query
record.invalidate_recordset(['tags'])
record.read(['tags'])
user = self.env['res.users'].create({'name': "user", 'login': "user"})
record_user = record.with_user(user)
# prep the following query count by caching access check related data
record_user.invalidate_recordset(['tags'])
record_user.read(['tags'])
# only one query as user: reading pivot table
with self.assertQueryCount(1):
# trick: if value is in cache, read() does not make any query
record_user.invalidate_recordset(['tags'])
record_user.read(['tags'])
# create a passing ir.rule
self.env['ir.rule'].create({
'model_id': self.env['ir.model']._get(record._name).id,
'domain_force': "[('id', '=', %d)]" % record.id,
})
# prep the following query count by caching access check related data
record_user.invalidate_recordset(['tags'])
record_user.read(['tags'])
# still only 1 query: reading pivot table
# access rules are checked in python in this case
with self.assertQueryCount(1):
# trick: if value is in cache, read() does not make any query
record_user.invalidate_recordset(['tags'])
record_user.read(['tags'])
# create a blocking ir.rule
self.env['ir.rule'].create({
'model_id': self.env['ir.model']._get(record._name).id,
'domain_force': "[('id', '!=', %d)]" % record.id,
})
# ensure ir.rule is applied even when reading m2m
with self.assertRaises(AccessError):
record_user.read(['tags'])
def test_98_prefetch_translate(self):
Model = self.registry['test_new_api.prefetch']
# translated '_rec_name' field should be prefetched
self.assertTrue(Model.name.prefetch)
# translated fields should be prefetch=True by default
self.assertTrue(Model.description.prefetch)
self.assertTrue(Model.html_description.prefetch)
# parameter 'prefetch' can be always overridden
self.assertFalse(Model.rare_description.prefetch)
self.assertFalse(Model.rare_html_description.prefetch)
def test_98_unlink_recompute(self):
move = self.env['test_new_api.move'].create({
'line_ids': [(0, 0, {'quantity': 42})],
})
line = move.line_ids
self.assertEqual(move.quantity, 42)
# create an ir.rule for lines that uses move.quantity
self.env['ir.rule'].create({
'model_id': self.env['ir.model']._get(line._name).id,
'domain_force': "[('move_id.quantity', '>=', 0)]",
})
# unlink the line, and check the recomputation of move.quantity
user = self.user_demo
line.with_user(user).unlink()
self.assertEqual(move.quantity, 0)
def test_99_prefetch_group(self):
records = self.env['test_new_api.prefetch'].create([{} for _ in range(10)])
self.env.flush_all()
self.env.invalidate_all()
with self.assertQueries(["""
SELECT "test_new_api_prefetch"."id",
"test_new_api_prefetch"."name"->>%s,
"test_new_api_prefetch"."description"->>%s,
"test_new_api_prefetch"."html_description"->>%s,
"test_new_api_prefetch"."create_uid",
"test_new_api_prefetch"."create_date",
"test_new_api_prefetch"."write_uid",
"test_new_api_prefetch"."write_date"
FROM "test_new_api_prefetch"
WHERE ("test_new_api_prefetch"."id" IN %s)
"""]):
records.mapped('name') # fetch all fields with prefetch=True
with self.assertQueries(["""
SELECT
"test_new_api_prefetch"."id",
"test_new_api_prefetch"."harry",
"test_new_api_prefetch"."hermione",
"test_new_api_prefetch"."ron"
FROM "test_new_api_prefetch"
WHERE ("test_new_api_prefetch"."id" IN %s)
"""]):
records.mapped('harry') # fetch all fields with prefetch='Harry Potter'
records.mapped('hermione') # fetched already
records.mapped('ron') # fetched already
with self.assertQueries(["""
SELECT
"test_new_api_prefetch"."id",
"test_new_api_prefetch"."hansel",
"test_new_api_prefetch"."gretel"
FROM "test_new_api_prefetch"
WHERE ("test_new_api_prefetch"."id" IN %s)
"""]):
records.mapped('hansel') # fetch all fields with prefetch='Hansel and Gretel'
records.mapped('gretel') # fetched already
self.env.invalidate_all()
with self.assertQueryCount(4):
records.mapped('name') # fetch all fields with prefetch=True
records.mapped('hansel') # fetch all fields with prefetch='Hansel and Gretel'
records.mapped('harry') # fetch all fields with prefetch='Harry Potter'
records.mapped('rare_description') # fetch that field only
def test_cache_key_invalidation(self):
company0 = self.env.ref('base.main_company')
company1 = self.env['res.company'].create({'name': 'A'})
user0 = self.env['res.users'].create({
'name': 'Foo', 'login': 'foo', 'company_id': company0.id,
'company_ids': [Command.set([company0.id, company1.id])],
})
# this uses company0
record = self.env['test_new_api.company'].with_user(user0).create({
'foo': 'main',
})
self.assertEqual(record.env.company, company0)
self.assertEqual(record.foo, 'main')
# change the user's company, so we implicitly switch to company1
user0.company_id = company1
self.assertEqual(record.env.company, company1)
self.assertEqual(record.foo, False)
def test_field_set_prefetch(self):
records = self.env['test_new_api.prefetch'].create([
{'line_ids': [Command.create({})]},
{'line_ids': [Command.create({})]},
{'line_ids': [Command.create({})]},
{'line_ids': [Command.create({})]},
])
# This test ensures that the prefetch set is preserved when using Field.__set__(),
# which calls BaseModel.write(). The prefetch set is important for write() to
# ensure that method modified() can batch the fetching of relational fields.
# In this case, modifying 'harry' on a record should add a related field to
# recompute through the one2many field 'line_ids', which we expect to be fetched
# in batch with the prefetch set.
# one query for modified, one for the records, one for their lines
self.env.invalidate_all()
with self.assertQueryCount(3):
for index, record in enumerate(records):
record.harry = index + 1
# same result by calling write() directly
self.env.invalidate_all()
with self.assertQueryCount(3):
for index, record in enumerate(records):
record.write({'harry': index + 2})
class TestX2many(TransactionExpressionCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.user_portal = cls.env['res.users'].sudo().search([('login', '=', 'portal')])
cls.partner_portal = cls.user_portal.partner_id
if not cls.user_portal:
cls.env['ir.config_parameter'].sudo().set_param('auth_password_policy.minlength', 4)
cls.partner_portal = cls.env['res.partner'].create({
'name': 'Joel Willis',
'email': 'joel.willis63@example.com',
})
cls.user_portal = cls.env['res.users'].with_context(no_reset_password=True).create({
'login': 'portal',
'password': 'portal',
'partner_id': cls.partner_portal.id,
'groups_id': [Command.set([cls.env.ref('base.group_portal').id])],
})
def test_definition_many2many(self):
""" Test the definition of inherited many2many fields. """
field = self.env['test_new_api.multi.line']._fields['tags']
self.assertEqual(field.relation, 'test_new_api_multi_line_test_new_api_multi_tag_rel')
self.assertEqual(field.column1, 'test_new_api_multi_line_id')
self.assertEqual(field.column2, 'test_new_api_multi_tag_id')
field = self.env['test_new_api.multi.line2']._fields['tags']
self.assertEqual(field.relation, 'test_new_api_multi_line2_test_new_api_multi_tag_rel')
self.assertEqual(field.column1, 'test_new_api_multi_line2_id')
self.assertEqual(field.column2, 'test_new_api_multi_tag_id')
def test_10_ondelete_many2many(self):
"""Test A can't be deleted when used on the relation."""
record_a = self.env['test_new_api.model_a'].create({'name': 'a'})
record_b = self.env['test_new_api.model_b'].create({'name': 'b'})
record_a.write({
'a_restricted_b_ids': [Command.set(record_b.ids)],
})
with self.assertRaises(psycopg2.IntegrityError):
with mute_logger('odoo.sql_db'), self.cr.savepoint():
record_a.unlink()
# Test B is still cascade.
record_b.unlink()
self.assertFalse(record_b.exists())
def test_11_ondelete_many2many(self):
"""Test B can't be deleted when used on the relation."""
record_a = self.env['test_new_api.model_a'].create({'name': 'a'})
record_b = self.env['test_new_api.model_b'].create({'name': 'b'})
record_a.write({
'b_restricted_b_ids': [Command.set(record_b.ids)],
})
with self.assertRaises(psycopg2.IntegrityError):
with mute_logger('odoo.sql_db'), self.cr.savepoint():
record_b.unlink()
# Test A is still cascade.
record_a.unlink()
self.assertFalse(record_a.exists())
def test_12_active_test_one2many(self):
Model = self.env['test_new_api.model_active_field']
parent = Model.create({})
self.assertFalse(parent.children_ids)
# create with implicit active_test=True in context
child1, child2 = Model.create([
{'parent_id': parent.id, 'active': True},
{'parent_id': parent.id, 'active': False},
])
act_children = child1
all_children = child1 + child2
self.assertEqual(parent.children_ids, act_children)
self.assertEqual(parent.with_context(active_test=True).children_ids, act_children)
self.assertEqual(parent.with_context(active_test=False).children_ids, all_children)
# create with active_test=False in context
child3, child4 = Model.with_context(active_test=False).create([
{'parent_id': parent.id, 'active': True},
{'parent_id': parent.id, 'active': False},
])
act_children = child1 + child3
all_children = child1 + child2 + child3 + child4
self.assertEqual(parent.children_ids, act_children)
self.assertEqual(parent.with_context(active_test=True).children_ids, act_children)
self.assertEqual(parent.with_context(active_test=False).children_ids, all_children)
# replace active children
parent.write({'children_ids': [Command.set([child1.id])]})
act_children = child1
all_children = child1 + child2 + child4
self.assertEqual(parent.children_ids, act_children)
self.assertEqual(parent.with_context(active_test=True).children_ids, act_children)
self.assertEqual(parent.with_context(active_test=False).children_ids, all_children)
# replace all children
parent.with_context(active_test=False).write({'children_ids': [Command.set([child1.id])]})
act_children = child1
all_children = child1
self.assertEqual(parent.children_ids, act_children)
self.assertEqual(parent.with_context(active_test=True).children_ids, act_children)
self.assertEqual(parent.with_context(active_test=False).children_ids, all_children)
# check recomputation of inactive records
parent.write({'children_ids': [Command.set(child4.ids)]})
self.assertTrue(child4.parent_active)
parent.active = False
self.assertFalse(child4.parent_active)
def test_12_active_test_one2many_with_context(self):
Model = self.env['test_new_api.model_active_field']
parent = Model.create({})
all_children = Model.create([
{'parent_id': parent.id, 'active': True},
{'parent_id': parent.id, 'active': False},
])
act_children = all_children[0]
self.assertEqual(parent.children_ids, act_children)
self.assertEqual(parent.with_context(active_test=True).children_ids, act_children)
self.assertEqual(parent.with_context(active_test=False).children_ids, all_children)
self.assertEqual(parent.all_children_ids, all_children)
self.assertEqual(parent.with_context(active_test=True).all_children_ids, all_children)
self.assertEqual(parent.with_context(active_test=False).all_children_ids, all_children)
self.assertEqual(parent.active_children_ids, act_children)
self.assertEqual(parent.with_context(active_test=True).active_children_ids, act_children)
self.assertEqual(parent.with_context(active_test=False).active_children_ids, act_children)
# check read()
self.env.invalidate_all()
self.assertEqual(parent.children_ids, act_children)
self.assertEqual(parent.all_children_ids, all_children)
self.assertEqual(parent.active_children_ids, act_children)
self.env.invalidate_all()
self.assertEqual(parent.with_context(active_test=False).children_ids, all_children)
self.assertEqual(parent.with_context(active_test=False).all_children_ids, all_children)
self.assertEqual(parent.with_context(active_test=False).active_children_ids, act_children)
def test_12_active_test_one2many_search(self):
# TODO use _search, filtered domains behaves strangely for hierarchies
Model = self.env['test_new_api.model_active_field']
parent = Model.create({
'children_ids': [
Command.create({'name': 'A', 'active': True}),
Command.create({'name': 'B', 'active': False}),
],
})
# a one2many field without context does not match its inactive children
self.assertIn(parent, Model.search([('children_ids.name', '=', 'A')]))
self.assertNotIn(parent, Model.search([('children_ids.name', '=', 'B')]))
# Same result when it used name_search
self.assertIn(parent, Model.search([('children_ids', '=', 'A')]))
self.assertNotIn(parent, Model.search([('children_ids', '=', 'B')]))
# Same result with the child_of operator
self.assertIn(parent, Model.search([('children_ids', 'child_of', 'A')]))
self.assertNotIn(parent, Model.search([('children_ids', 'child_of', 'B')]))
# a one2many field with active_test=False matches its inactive children
self.assertIn(parent, Model.search([('all_children_ids.name', '=', 'A')]))
self.assertIn(parent, Model.search([('all_children_ids.name', '=', 'B')]))
# Same result when it used name_search
self.assertIn(parent, Model.search([('all_children_ids', '=', 'A')]))
# Same result with the child_of operator
self.assertIn(parent, Model.search([('all_children_ids', 'child_of', 'A')]))
self.assertIn(parent, Model.search([('all_children_ids', '=', 'B')]))
# Same result with the child_of operator
self.assertIn(parent, Model.search([('all_children_ids', 'child_of', 'A')]))
self.assertIn(parent, Model.search([('all_children_ids', 'child_of', 'B')]))
def test_12_active_test_many2many_search(self):
# TODO use _search, filtered domains behaves strangely for hierarchies
Model = self.env['test_new_api.model_active_field']
parent = Model.create({
'relatives_ids': [
Command.create({'name': 'A', 'active': True}),
Command.create({'name': 'B', 'active': False}),
],
})
child_a, child_b = parent.with_context(active_test=False).relatives_ids
# a many2many field without context does not match its inactive children
self.assertIn(parent, Model.search([('relatives_ids.name', '=', 'A')]))
self.assertNotIn(parent, Model.search([('relatives_ids.name', '=', 'B')]))
# Same result when it used name_search
self.assertIn(parent, Model.search([('relatives_ids', '=', 'A')]))
self.assertNotIn(parent, Model.search([('relatives_ids', '=', 'B')]))
# Same result with the child_of operator
self.assertIn(parent, Model.search([('relatives_ids', 'child_of', child_a.id)]))
self.assertIn(parent, Model.search([('relatives_ids', 'child_of', 'A')]))
self.assertNotIn(parent, Model.search([('relatives_ids', 'child_of', child_b.id)]))
self.assertNotIn(parent, Model.search([('relatives_ids', 'child_of', 'B')]))
# a many2many field with active_test=False matches its inactive children
self.assertIn(parent, Model.search([('all_relatives_ids.name', '=', 'A')]))
self.assertIn(parent, Model.search([('all_relatives_ids.name', '=', 'B')]))
# Same result when it used name_search
self.assertIn(parent, Model.search([('all_relatives_ids', '=', 'A')]))
self.assertIn(parent, Model.search([('all_relatives_ids', '=', 'B')]))
# Same result with the child_of operator
self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', child_a.id)]))
self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', 'A')]))
self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', child_b.id)]))
self.assertIn(parent, Model.search([('all_relatives_ids', 'child_of', 'B')]))
def test_search_many2many(self):
""" Tests search on many2many fields. """
tags = self.env['test_new_api.multi.tag']
tagA = tags.create({})
tagB = tags.create({})
tagC = tags.create({})
recs = self.env['test_new_api.multi.line']
recW = recs.create({})
recX = recs.create({'tags': [Command.link(tagA.id)]})
recY = recs.create({'tags': [Command.link(tagB.id)]})
recZ = recs.create({'tags': [Command.link(tagA.id), Command.link(tagB.id)]})
recs = recW + recX + recY + recZ
# test 'in'
result = self._search(recs, [('tags', 'in', (tagA + tagB).ids)])
self.assertEqual(result, recX + recY + recZ)
result = self._search(recs, [('tags', 'in', tagA.ids)])
self.assertEqual(result, recX + recZ)
result = self._search(recs, [('tags', 'in', tagB.ids)])
self.assertEqual(result, recY + recZ)
result = self._search(recs, [('tags', 'in', tagC.ids)])
self.assertEqual(result, recs.browse())
result = self._search(recs, [('tags', 'in', [])])
self.assertEqual(result, recs.browse())
# test 'not in'
result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', (tagA + tagB).ids)])
self.assertEqual(result, recs - recX - recY - recZ)
result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', tagA.ids)])
self.assertEqual(result, recs - recX - recZ)
result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', tagB.ids)])
self.assertEqual(result, recs - recY - recZ)
result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', tagC.ids)])
self.assertEqual(result, recs)
result = self._search(recs, [('id', 'in', recs.ids), ('tags', 'not in', [])])
self.assertEqual(result, recs)
# special case: compare with False
result = self._search(recs, [('id', 'in', recs.ids), ('tags', '=', False)])
self.assertEqual(result, recW)
result = self._search(recs, [('id', 'in', recs.ids), ('tags', '!=', False)])
self.assertEqual(result, recs - recW)
def test_search_one2many(self):
""" Tests search on one2many fields. """
recs = self.env['test_new_api.multi']
recX = recs.create({'lines': [Command.create({}), Command.create({})]})
recY = recs.create({'lines': [Command.create({})]})
recZ = recs.create({})
recs = recX + recY + recZ
line1, line2, line3 = recs.lines
line4 = recs.create({'lines': [Command.create({})]}).lines
line0 = line4.create({})
# test 'in'
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', (line1 + line2 + line3 + line4).ids)])
self.assertEqual(result, recX + recY)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', (line1 + line3 + line4).ids)])
self.assertEqual(result, recX + recY)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', (line1 + line4).ids)])
self.assertEqual(result, recX)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', line4.ids)])
self.assertEqual(result, recs.browse())
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'in', [])])
self.assertEqual(result, recs.browse())
# test 'not in'
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line2 + line3).ids)])
self.assertEqual(result, recs - recX - recY)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line3).ids)])
self.assertEqual(result, recs - recX - recY)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', line1.ids)])
self.assertEqual(result, recs - recX)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line4).ids)])
self.assertEqual(result, recs - recX)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', line4.ids)])
self.assertEqual(result, recs)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', [])])
self.assertEqual(result, recs)
# test 'not in' where the lines contain NULL values
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', (line1 + line0).ids)])
self.assertEqual(result, recs - recX)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', 'not in', line0.ids)])
self.assertEqual(result, recs)
# special case: compare with False
result = self._search(recs, [('id', 'in', recs.ids), ('lines', '=', False)])
self.assertEqual(result, recZ)
result = self._search(recs, [('id', 'in', recs.ids), ('lines', '!=', False)])
self.assertEqual(result, recs - recZ)
def test_create_batch_m2m(self):
lines = self.env['test_new_api.multi.line'].create([{
'tags': [Command.create({'name': str(j)}) for j in range(3)],
} for i in range(3)])
self.assertEqual(len(lines), 3)
for line in lines:
self.assertEqual(len(line.tags), 3)
def test_custom_m2m(self):
model_id = self.env['ir.model']._get_id('res.partner')
field = self.env['ir.model.fields'].create({
'name': 'x_foo',
'field_description': 'Foo',
'model_id': model_id,
'ttype': 'many2many',
'relation': 'res.country',
'store': False,
})
self.assertTrue(field.unlink())
def test_custom_m2m_related(self):
# this checks the ondelete of a related many2many field
model_id = self.env['ir.model']._get_id('res.partner')
field = self.env['ir.model.fields'].create({
'name': 'x_foo',
'field_description': 'Foo',
'model_id': model_id,
'ttype': 'many2many',
'relation': 'res.partner.category',
'related': 'category_id',
'readonly': True,
'store': True,
})
self.assertTrue(field.unlink())
@mute_logger('odoo.addons.base.models.ir_model')
@users('portal')
def test_sudo_commands(self):
"""Test manipulating a x2many field using Commands with `sudo` or with another user (`with_user`)
is not allowed when the destination model is flagged `_allow_sudo_commands = False` and the transaction user
does not have the required access rights.
This test asserts an AccessError is raised
when a user attempts to pass Commands to a One2many and Many2many field
targeting a model flagged with `_allow_sudo_commands = False`
while using an environment with `sudo()` or `with_user(admin_user)`.
The `with_user` are edge cases in some business codes, where a more-priviledged user is used temporary
to perform an action, such as:
- `Documents.with_user(share.create_uid)`
- `request.env['sign.request'].with_user(contract.hr_responsible_id).sudo()`
"""
admin_user = self.env.ref('base.user_admin')
my_user = self.env.user.sudo(False)
# 1. one2many field `res.partner.user_ids`
# Sanity checks
# `res.partner` must be flagged as `_allow_sudo_commands = False` otherwise the test is pointless
self.assertEqual(self.env['res.users']._allow_sudo_commands, False)
# in case the type of `res.partner.user_ids` changes in a future release.
# if `res.partner.user_ids` is no longer a one2many, this test must be adapted.
self.assertEqual(self.env['res.partner']._fields['user_ids'].type, 'one2many')
my_partner = my_user.partner_id
for Partner, my_partner in [
(self.env['res.partner'].with_user(admin_user), my_partner.with_user(admin_user)),
(self.env['res.partner'].sudo(), my_partner.sudo()),
]:
# 1.0 Command.CREATE
# Case: a public/portal user creating a new users with arbitrary values
with self.assertRaisesRegex(AccessError, "not allowed to create 'User'"):
Partner.create({
'name': 'foo',
'user_ids': [Command.create({
'login': 'foo',
'password': 'foo',
})],
})
# 1.1 Command.UPDATE
# Case: a public/portal updating his user to add himself a group
with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"):
my_partner.write({
'user_ids': [Command.update(my_partner.user_ids[0].id, {
'groups_id': [self.env.ref('base.group_system').id],
})],
})
# 1.2 Command.DELETE
# Case: a public user deleting the public user to mess with the database
with self.assertRaisesRegex(AccessError, "not allowed to delete 'User'"):
my_partner.write({
'user_ids': [Command.delete(my_partner.user_ids[0].id)],
})
# 1.3 Command.UNLINK
# Case: a public user unlinking the public partner and the public user to mess with the database
with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"):
my_partner.write({
'user_ids': [Command.unlink(my_partner.user_ids[0].id)],
})
# 1.4 Command.LINK
# Case: a public/portal user changing the `partner_id` of an admin,
# to change the email address of the user and ask for a reset password.
with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"):
my_partner.write({
'user_ids': [Command.link(admin_user.id)],
})
# 1.5 Command.CLEAR
# Case: a public user unlinking the public partner and the public user just to mess with the database
with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"):
my_partner.write({
'user_ids': [Command.clear()],
})
# 1.6 Command.SET
# Case: a public/portal user changing the `partner_id` of an admin,
# to change the email address of the user and ask for a reset password.
with self.assertRaisesRegex(AccessError, "not allowed to modify 'User'"):
my_partner.write({
'user_ids': [Command.set([admin_user.id])],
})
# 2. many2many field `test_new_api.discussion.participants`
# Sanity checks
# `test_new_api.user` must be flagged as `_allow_sudo_commands = False` otherwise the test is pointless
self.assertEqual(self.env['test_new_api.group']._allow_sudo_commands, False)
# in case the type of `test_new_api.discussion.participants` changes in a future release.
# if `test_new_api.discussion.participants` is no longer a many2many, this test must be adapted.
self.assertEqual(self.env['test_new_api.user']._fields['group_ids'].type, 'many2many')
public_group = self.env['test_new_api.group'].with_user(admin_user).create({
'name': 'public'
}).with_user(self.env.user)
my_user = self.env['test_new_api.user'].with_user(admin_user).create({
'name': 'foo',
'group_ids': [public_group.id],
}).with_user(self.env.user)
for User, my_user in [
(self.env['test_new_api.user'].with_user(admin_user), my_user.with_user(admin_user)),
(self.env['test_new_api.user'].sudo(), my_user.sudo()),
]:
# 2.0 Command.CREATE
# Case: a public/portal user creating a new users with arbitrary values
with self.assertRaisesRegex(AccessError, "not allowed to create 'test_new_api.group'"):
User.create({
'name': 'foo',
'group_ids': [Command.create({})],
})
# 2.1 Command.UPDATE
# Case: a public/portal updating his user to add himself a group
with self.assertRaisesRegex(AccessError, "not allowed to modify 'test_new_api.group'"):
my_user.write({
'group_ids': [Command.update(my_user.group_ids[0].id, {})],
})
# 2.2 Command.DELETE
# Case: a public user deleting the public user to mess with the database
with self.assertRaisesRegex(AccessError, "not allowed to delete 'test_new_api.group'"):
my_user.write({
'group_ids': [Command.delete(my_user.group_ids[0].id)],
})
class TestHtmlField(TransactionCase):
def setUp(self):
super(TestHtmlField, self).setUp()
self.model = self.env['test_new_api.mixed']
def test_00_sanitize(self):
self.assertEqual(self.model._fields['comment1'].sanitize, False)
self.assertEqual(self.model._fields['comment2'].sanitize_attributes, True)
self.assertEqual(self.model._fields['comment2'].strip_classes, False)
self.assertEqual(self.model._fields['comment3'].sanitize_attributes, True)
self.assertEqual(self.model._fields['comment3'].strip_classes, True)
some_ugly_html = """Oops this should maybe be sanitized
% if object.some_field and not object.oriented:
% if object.other_field:
${object.mako_thing}
|
This is some html.
% endif
%if object.dummy_field:
Youpie
%endif"""
record = self.model.create({
'comment1': some_ugly_html,
'comment2': some_ugly_html,
'comment3': some_ugly_html,
'comment4': some_ugly_html,
})
self.assertEqual(record.comment1, some_ugly_html, 'Error in HTML field: content was sanitized but field has sanitize=False')
self.assertIn('