357 lines
16 KiB
Python
357 lines
16 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
|
||
|
from odoo.exceptions import AccessError
|
||
|
from odoo.tests.common import TransactionCase, tagged
|
||
|
from odoo.tools import mute_logger
|
||
|
from odoo import Command
|
||
|
|
||
|
|
||
|
class TestORM(TransactionCase):
|
||
|
""" test special behaviors of ORM CRUD functions """
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_access_deleted_records(self):
|
||
|
""" Verify that accessing deleted records works as expected """
|
||
|
c1 = self.env['res.partner.category'].create({'name': 'W'})
|
||
|
c2 = self.env['res.partner.category'].create({'name': 'Y'})
|
||
|
c1.unlink()
|
||
|
|
||
|
# read() is expected to skip deleted records because our API is not
|
||
|
# transactional for a sequence of search()->read() performed from the
|
||
|
# client-side... a concurrent deletion could therefore cause spurious
|
||
|
# exceptions even when simply opening a list view!
|
||
|
# /!\ Using unprileged user to detect former side effects of ir.rules!
|
||
|
user = self.env['res.users'].create({
|
||
|
'name': 'test user',
|
||
|
'login': 'test2',
|
||
|
'groups_id': [Command.set([self.ref('base.group_user')])],
|
||
|
})
|
||
|
cs = (c1 + c2).with_user(user)
|
||
|
self.assertEqual([{'id': c2.id, 'name': 'Y'}], cs.read(['name']), "read() should skip deleted records")
|
||
|
self.assertEqual([], cs[0].read(['name']), "read() should skip deleted records")
|
||
|
|
||
|
# Deleting an already deleted record should be simply ignored
|
||
|
self.assertTrue(c1.unlink(), "Re-deleting should be a no-op")
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_access_partial_deletion(self):
|
||
|
""" Check accessing a record from a recordset where another record has been deleted. """
|
||
|
Model = self.env['res.country']
|
||
|
self.assertTrue(type(Model).display_name.automatic, "test assumption not satisfied")
|
||
|
|
||
|
# access regular field when another record from the same prefetch set has been deleted
|
||
|
records = Model.create([{'name': name[0], 'code': name[1]} for name in (['Foo', 'ZV'], ['Bar', 'ZX'], ['Baz', 'ZY'])])
|
||
|
for record in records:
|
||
|
record.name
|
||
|
record.unlink()
|
||
|
|
||
|
# access computed field when another record from the same prefetch set has been deleted
|
||
|
records = Model.create([{'name': name[0], 'code': name[1]} for name in (['Foo', 'ZV'], ['Bar', 'ZX'], ['Baz', 'ZY'])])
|
||
|
for record in records:
|
||
|
record.display_name
|
||
|
record.unlink()
|
||
|
|
||
|
@mute_logger('odoo.models', 'odoo.addons.base.models.ir_rule')
|
||
|
def test_access_filtered_records(self):
|
||
|
""" Verify that accessing filtered records works as expected for non-admin user """
|
||
|
p1 = self.env['res.partner'].create({'name': 'W'})
|
||
|
p2 = self.env['res.partner'].create({'name': 'Y'})
|
||
|
user = self.env['res.users'].create({
|
||
|
'name': 'test user',
|
||
|
'login': 'test2',
|
||
|
'groups_id': [Command.set([self.ref('base.group_user')])],
|
||
|
})
|
||
|
|
||
|
partner_model = self.env['ir.model'].search([('model','=','res.partner')])
|
||
|
self.env['ir.rule'].create({
|
||
|
'name': 'Y is invisible',
|
||
|
'domain_force': [('id', '!=', p1.id)],
|
||
|
'model_id': partner_model.id,
|
||
|
})
|
||
|
|
||
|
# search as unprivileged user
|
||
|
partners = self.env['res.partner'].with_user(user).search([])
|
||
|
self.assertNotIn(p1, partners, "W should not be visible...")
|
||
|
self.assertIn(p2, partners, "... but Y should be visible")
|
||
|
|
||
|
# read as unprivileged user
|
||
|
with self.assertRaises(AccessError):
|
||
|
p1.with_user(user).read(['name'])
|
||
|
# write as unprivileged user
|
||
|
with self.assertRaises(AccessError):
|
||
|
p1.with_user(user).write({'name': 'foo'})
|
||
|
# unlink as unprivileged user
|
||
|
with self.assertRaises(AccessError):
|
||
|
p1.with_user(user).unlink()
|
||
|
|
||
|
# Prepare mixed case
|
||
|
p2.unlink()
|
||
|
# read mixed records: some deleted and some filtered
|
||
|
with self.assertRaises(AccessError):
|
||
|
(p1 + p2).with_user(user).read(['name'])
|
||
|
# delete mixed records: some deleted and some filtered
|
||
|
with self.assertRaises(AccessError):
|
||
|
(p1 + p2).with_user(user).unlink()
|
||
|
|
||
|
def test_read(self):
|
||
|
partner = self.env['res.partner'].create({'name': 'MyPartner1'})
|
||
|
result = partner.read()
|
||
|
self.assertIsInstance(result, list)
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_search_read(self):
|
||
|
partner = self.env['res.partner']
|
||
|
|
||
|
# simple search_read
|
||
|
partner.create({'name': 'MyPartner1'})
|
||
|
found = partner.search_read([('name', '=', 'MyPartner1')], ['name'])
|
||
|
self.assertEqual(len(found), 1)
|
||
|
self.assertEqual(found[0]['name'], 'MyPartner1')
|
||
|
self.assertIn('id', found[0])
|
||
|
|
||
|
# search_read correct order
|
||
|
partner.create({'name': 'MyPartner2'})
|
||
|
found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name")
|
||
|
self.assertEqual(len(found), 2)
|
||
|
self.assertEqual(found[0]['name'], 'MyPartner1')
|
||
|
self.assertEqual(found[1]['name'], 'MyPartner2')
|
||
|
found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name desc")
|
||
|
self.assertEqual(len(found), 2)
|
||
|
self.assertEqual(found[0]['name'], 'MyPartner2')
|
||
|
self.assertEqual(found[1]['name'], 'MyPartner1')
|
||
|
|
||
|
# search_read that finds nothing
|
||
|
found = partner.search_read([('name', '=', 'Does not exists')], ['name'])
|
||
|
self.assertEqual(len(found), 0)
|
||
|
|
||
|
# search_read with an empty array of fields
|
||
|
found = partner.search_read([], [], limit=1)
|
||
|
self.assertEqual(len(found), 1)
|
||
|
self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])
|
||
|
|
||
|
# search_read without fields
|
||
|
found = partner.search_read([], False, limit=1)
|
||
|
self.assertEqual(len(found), 1)
|
||
|
self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])
|
||
|
|
||
|
@mute_logger('odoo.sql_db')
|
||
|
def test_exists(self):
|
||
|
partner = self.env['res.partner']
|
||
|
|
||
|
# check that records obtained from search exist
|
||
|
recs = partner.search([])
|
||
|
self.assertTrue(recs)
|
||
|
self.assertEqual(recs.exists(), recs)
|
||
|
|
||
|
# check that new records exist by convention
|
||
|
recs = partner.new({})
|
||
|
self.assertTrue(recs.exists())
|
||
|
|
||
|
# check that there is no record with id 0
|
||
|
recs = partner.browse([0])
|
||
|
self.assertFalse(recs.exists())
|
||
|
|
||
|
def test_write_duplicate(self):
|
||
|
p1 = self.env['res.partner'].create({'name': 'W'})
|
||
|
(p1 + p1).write({'name': 'X'})
|
||
|
|
||
|
def test_m2m_store_trigger(self):
|
||
|
group_user = self.env.ref('base.group_user')
|
||
|
|
||
|
user = self.env['res.users'].create({
|
||
|
'name': 'test',
|
||
|
'login': 'test_m2m_store_trigger',
|
||
|
'groups_id': [Command.set([])],
|
||
|
})
|
||
|
self.assertTrue(user.share)
|
||
|
|
||
|
group_user.write({'users': [Command.link(user.id)]})
|
||
|
self.assertFalse(user.share)
|
||
|
|
||
|
group_user.write({'users': [Command.unlink(user.id)]})
|
||
|
self.assertTrue(user.share)
|
||
|
|
||
|
def test_create_multi(self):
|
||
|
""" create for multiple records """
|
||
|
# assumption: 'res.bank' does not override 'create'
|
||
|
vals_list = [{'name': name} for name in ('Foo', 'Bar', 'Baz')]
|
||
|
vals_list[0]['email'] = 'foo@example.com'
|
||
|
for vals in vals_list:
|
||
|
record = self.env['res.bank'].create(vals)
|
||
|
self.assertEqual(len(record), 1)
|
||
|
self.assertEqual(record.name, vals['name'])
|
||
|
self.assertEqual(record.email, vals.get('email', False))
|
||
|
|
||
|
records = self.env['res.bank'].create([])
|
||
|
self.assertFalse(records)
|
||
|
|
||
|
records = self.env['res.bank'].create(vals_list)
|
||
|
self.assertEqual(len(records), len(vals_list))
|
||
|
for record, vals in zip(records, vals_list):
|
||
|
self.assertEqual(record.name, vals['name'])
|
||
|
self.assertEqual(record.email, vals.get('email', False))
|
||
|
|
||
|
# create countries and states
|
||
|
vals_list = [{
|
||
|
'name': 'Foo',
|
||
|
'state_ids': [
|
||
|
Command.create({'name': 'North Foo', 'code': 'NF'}),
|
||
|
Command.create({'name': 'South Foo', 'code': 'SF'}),
|
||
|
Command.create({'name': 'West Foo', 'code': 'WF'}),
|
||
|
Command.create({'name': 'East Foo', 'code': 'EF'}),
|
||
|
],
|
||
|
'code': 'ZV',
|
||
|
}, {
|
||
|
'name': 'Bar',
|
||
|
'state_ids': [
|
||
|
Command.create({'name': 'North Bar', 'code': 'NB'}),
|
||
|
Command.create({'name': 'South Bar', 'code': 'SB'}),
|
||
|
],
|
||
|
'code': 'ZX',
|
||
|
}]
|
||
|
foo, bar = self.env['res.country'].create(vals_list)
|
||
|
self.assertEqual(foo.name, 'Foo')
|
||
|
self.assertCountEqual(foo.mapped('state_ids.code'), ['NF', 'SF', 'WF', 'EF'])
|
||
|
self.assertEqual(bar.name, 'Bar')
|
||
|
self.assertCountEqual(bar.mapped('state_ids.code'), ['NB', 'SB'])
|
||
|
|
||
|
|
||
|
class TestInherits(TransactionCase):
|
||
|
""" test the behavior of the orm for models that use _inherits;
|
||
|
specifically: res.users, that inherits from res.partner
|
||
|
"""
|
||
|
|
||
|
def test_default(self):
|
||
|
""" `default_get` cannot return a dictionary or a new id """
|
||
|
defaults = self.env['res.users'].default_get(['partner_id'])
|
||
|
if 'partner_id' in defaults:
|
||
|
self.assertIsInstance(defaults['partner_id'], (bool, int))
|
||
|
|
||
|
def test_create(self):
|
||
|
""" creating a user should automatically create a new partner """
|
||
|
partners_before = self.env['res.partner'].search([])
|
||
|
user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
|
||
|
|
||
|
self.assertNotIn(user_foo.partner_id, partners_before)
|
||
|
|
||
|
def test_create_with_ancestor(self):
|
||
|
""" creating a user with a specific 'partner_id' should not create a new partner """
|
||
|
partner_foo = self.env['res.partner'].create({'name': 'Foo'})
|
||
|
partners_before = self.env['res.partner'].search([])
|
||
|
user_foo = self.env['res.users'].create({'partner_id': partner_foo.id, 'login': 'foo'})
|
||
|
partners_after = self.env['res.partner'].search([])
|
||
|
|
||
|
self.assertEqual(partners_before, partners_after)
|
||
|
self.assertEqual(user_foo.name, 'Foo')
|
||
|
self.assertEqual(user_foo.partner_id, partner_foo)
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_read(self):
|
||
|
""" inherited fields should be read without any indirection """
|
||
|
user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
|
||
|
user_values, = user_foo.read()
|
||
|
partner_values, = user_foo.partner_id.read()
|
||
|
|
||
|
self.assertEqual(user_values['name'], partner_values['name'])
|
||
|
self.assertEqual(user_foo.name, user_foo.partner_id.name)
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_copy(self):
|
||
|
""" copying a user should automatically copy its partner, too """
|
||
|
user_foo = self.env['res.users'].create({
|
||
|
'name': 'Foo',
|
||
|
'login': 'foo',
|
||
|
'employee': True,
|
||
|
})
|
||
|
foo_before, = user_foo.read()
|
||
|
del foo_before['create_date']
|
||
|
del foo_before['write_date']
|
||
|
user_bar = user_foo.copy({'login': 'bar'})
|
||
|
foo_after, = user_foo.read()
|
||
|
del foo_after['create_date']
|
||
|
del foo_after['write_date']
|
||
|
self.assertEqual(foo_before, foo_after)
|
||
|
|
||
|
self.assertEqual(user_bar.name, 'Foo (copy)')
|
||
|
self.assertEqual(user_bar.login, 'bar')
|
||
|
self.assertEqual(user_foo.employee, user_bar.employee)
|
||
|
self.assertNotEqual(user_foo.id, user_bar.id)
|
||
|
self.assertNotEqual(user_foo.partner_id.id, user_bar.partner_id.id)
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_copy_with_ancestor(self):
|
||
|
""" copying a user with 'parent_id' in defaults should not duplicate the partner """
|
||
|
user_foo = self.env['res.users'].create({'login': 'foo', 'name': 'Foo', 'signature': 'Foo'})
|
||
|
partner_bar = self.env['res.partner'].create({'name': 'Bar'})
|
||
|
|
||
|
foo_before, = user_foo.read()
|
||
|
del foo_before['create_date']
|
||
|
del foo_before['write_date']
|
||
|
del foo_before['login_date']
|
||
|
partners_before = self.env['res.partner'].search([])
|
||
|
user_bar = user_foo.copy({'partner_id': partner_bar.id, 'login': 'bar'})
|
||
|
foo_after, = user_foo.read()
|
||
|
del foo_after['create_date']
|
||
|
del foo_after['write_date']
|
||
|
del foo_after['login_date']
|
||
|
partners_after = self.env['res.partner'].search([])
|
||
|
|
||
|
self.assertEqual(foo_before, foo_after)
|
||
|
self.assertEqual(partners_before, partners_after)
|
||
|
|
||
|
self.assertNotEqual(user_foo.id, user_bar.id)
|
||
|
self.assertEqual(user_bar.partner_id.id, partner_bar.id)
|
||
|
self.assertEqual(user_bar.login, 'bar', "login is given from copy parameters")
|
||
|
self.assertFalse(user_bar.password, "password should not be copied from original record")
|
||
|
self.assertEqual(user_bar.name, 'Bar', "name is given from specific partner")
|
||
|
self.assertEqual(user_bar.signature, user_foo.signature, "signature should be copied")
|
||
|
|
||
|
@mute_logger('odoo.models')
|
||
|
def test_write_date(self):
|
||
|
""" modifying inherited fields must update write_date """
|
||
|
user = self.env.user
|
||
|
write_date_before = user.write_date
|
||
|
|
||
|
# write base64 image
|
||
|
user.write({'image_1920': 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw=='})
|
||
|
write_date_after = user.write_date
|
||
|
self.assertNotEqual(write_date_before, write_date_after)
|
||
|
|
||
|
|
||
|
@tagged('post_install', '-at_install')
|
||
|
class TestCompanyDependent(TransactionCase):
|
||
|
def test_orm_ondelete_cascade(self):
|
||
|
# model_A
|
||
|
# | field_a company dependent many2one is
|
||
|
# | company dependent many2one stored as jsonb and doesn't
|
||
|
# v have db ON DELETE action
|
||
|
# model_B
|
||
|
# | field_b if a row for model_B is deleted
|
||
|
# | many2one (ondelete='cascade') because of ON DELETE CASCADE,
|
||
|
# v model_A will reference a deleted
|
||
|
# model_C row and have MissingError
|
||
|
# | field_c
|
||
|
# | many2one (ondelete='cascade') this test asks you to move the
|
||
|
# v ON DELETE CASCADE logic to ORM
|
||
|
# model_D and remove ondelete='cascade'
|
||
|
#
|
||
|
# Note:
|
||
|
# the test doesn't force developers to remove ondelete='cascade' for
|
||
|
# model_C if model_C is not referenced by another company dependent
|
||
|
# many2one field. But usually it is needed, unless you can accept
|
||
|
# the value of field_b to be an empty recordset of model_C
|
||
|
#
|
||
|
for model in self.env.registry.values():
|
||
|
for field in model._fields.values():
|
||
|
if field.company_dependent and field.type == 'many2one':
|
||
|
for comodel_field in self.env[field.comodel_name]._fields.values():
|
||
|
self.assertFalse(
|
||
|
comodel_field.type == 'many2one' and comodel_field.ondelete == 'cascade',
|
||
|
(f'when a row for {comodel_field.comodel_name} is deleted, a row for {comodel_field.model_name} '
|
||
|
f'may also be deleted for sake of on delete cascade field {comodel_field}, which may '
|
||
|
f'cause MissingError for a company dependent many2one field {field} in the future. '
|
||
|
f'Please override the unlink method of {comodel_field.comodel_name} and do the ORM on '
|
||
|
f'delete cascade logic and remove/override the ondelete="cascade" of {comodel_field}')
|
||
|
)
|