Odoo18-Base/odoo/addons/base/tests/test_orm.py
2025-03-04 11:07:12 +07:00

352 lines
15 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from odoo.exceptions import AccessError
from odoo.tests.common import TransactionCase, tagged
from odoo.tools import mute_logger
from odoo import Command
class TestORM(TransactionCase):
""" test special behaviors of ORM CRUD functions """
@mute_logger('odoo.models')
def test_access_deleted_records(self):
""" Verify that accessing deleted records works as expected """
c1 = self.env['res.partner.category'].create({'name': 'W'})
c2 = self.env['res.partner.category'].create({'name': 'Y'})
c1.unlink()
# read() is expected to skip deleted records because our API is not
# transactional for a sequence of search()->read() performed from the
# client-side... a concurrent deletion could therefore cause spurious
# exceptions even when simply opening a list view!
# /!\ Using unprileged user to detect former side effects of ir.rules!
user = self.env['res.users'].create({
'name': 'test user',
'login': 'test2',
'groups_id': [Command.set([self.ref('base.group_user')])],
})
cs = (c1 + c2).with_user(user)
self.assertEqual([{'id': c2.id, 'name': 'Y'}], cs.read(['name']), "read() should skip deleted records")
self.assertEqual([], cs[0].read(['name']), "read() should skip deleted records")
# Deleting an already deleted record should be simply ignored
self.assertTrue(c1.unlink(), "Re-deleting should be a no-op")
@mute_logger('odoo.models')
def test_access_partial_deletion(self):
""" Check accessing a record from a recordset where another record has been deleted. """
Model = self.env['res.country']
self.assertTrue(type(Model).display_name.automatic, "test assumption not satisfied")
# access regular field when another record from the same prefetch set has been deleted
records = Model.create([{'name': name[0], 'code': name[1]} for name in (['Foo', 'ZV'], ['Bar', 'ZX'], ['Baz', 'ZY'])])
for record in records:
record.name
record.unlink()
# access computed field when another record from the same prefetch set has been deleted
records = Model.create([{'name': name[0], 'code': name[1]} for name in (['Foo', 'ZV'], ['Bar', 'ZX'], ['Baz', 'ZY'])])
for record in records:
record.display_name
record.unlink()
@mute_logger('odoo.models', 'odoo.addons.base.models.ir_rule')
def test_access_filtered_records(self):
""" Verify that accessing filtered records works as expected for non-admin user """
p1 = self.env['res.partner'].create({'name': 'W'})
p2 = self.env['res.partner'].create({'name': 'Y'})
user = self.env['res.users'].create({
'name': 'test user',
'login': 'test2',
'groups_id': [Command.set([self.ref('base.group_user')])],
})
partner_model = self.env['ir.model'].search([('model','=','res.partner')])
self.env['ir.rule'].create({
'name': 'Y is invisible',
'domain_force': [('id', '!=', p1.id)],
'model_id': partner_model.id,
})
# search as unprivileged user
partners = self.env['res.partner'].with_user(user).search([])
self.assertNotIn(p1, partners, "W should not be visible...")
self.assertIn(p2, partners, "... but Y should be visible")
# read as unprivileged user
with self.assertRaises(AccessError):
p1.with_user(user).read(['name'])
# write as unprivileged user
with self.assertRaises(AccessError):
p1.with_user(user).write({'name': 'foo'})
# unlink as unprivileged user
with self.assertRaises(AccessError):
p1.with_user(user).unlink()
# Prepare mixed case
p2.unlink()
# read mixed records: some deleted and some filtered
with self.assertRaises(AccessError):
(p1 + p2).with_user(user).read(['name'])
# delete mixed records: some deleted and some filtered
with self.assertRaises(AccessError):
(p1 + p2).with_user(user).unlink()
def test_read(self):
partner = self.env['res.partner'].create({'name': 'MyPartner1'})
result = partner.read()
self.assertIsInstance(result, list)
@mute_logger('odoo.models')
def test_search_read(self):
partner = self.env['res.partner']
# simple search_read
partner.create({'name': 'MyPartner1'})
found = partner.search_read([('name', '=', 'MyPartner1')], ['name'])
self.assertEqual(len(found), 1)
self.assertEqual(found[0]['name'], 'MyPartner1')
self.assertIn('id', found[0])
# search_read correct order
partner.create({'name': 'MyPartner2'})
found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name")
self.assertEqual(len(found), 2)
self.assertEqual(found[0]['name'], 'MyPartner1')
self.assertEqual(found[1]['name'], 'MyPartner2')
found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name desc")
self.assertEqual(len(found), 2)
self.assertEqual(found[0]['name'], 'MyPartner2')
self.assertEqual(found[1]['name'], 'MyPartner1')
# search_read that finds nothing
found = partner.search_read([('name', '=', 'Does not exists')], ['name'])
self.assertEqual(len(found), 0)
# search_read with an empty array of fields
found = partner.search_read([], [], limit=1)
self.assertEqual(len(found), 1)
self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])
# search_read without fields
found = partner.search_read([], False, limit=1)
self.assertEqual(len(found), 1)
self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])
@mute_logger('odoo.sql_db')
def test_exists(self):
partner = self.env['res.partner']
# check that records obtained from search exist
recs = partner.search([])
self.assertTrue(recs)
self.assertEqual(recs.exists(), recs)
# check that new records exist by convention
recs = partner.new({})
self.assertTrue(recs.exists())
# check that there is no record with id 0
recs = partner.browse([0])
self.assertFalse(recs.exists())
def test_write_duplicate(self):
p1 = self.env['res.partner'].create({'name': 'W'})
(p1 + p1).write({'name': 'X'})
def test_m2m_store_trigger(self):
group_user = self.env.ref('base.group_user')
user = self.env['res.users'].create({
'name': 'test',
'login': 'test_m2m_store_trigger',
'groups_id': [Command.set([])],
})
self.assertTrue(user.share)
group_user.write({'users': [Command.link(user.id)]})
self.assertFalse(user.share)
group_user.write({'users': [Command.unlink(user.id)]})
self.assertTrue(user.share)
def test_create_multi(self):
""" create for multiple records """
# assumption: 'res.bank' does not override 'create'
vals_list = [{'name': name} for name in ('Foo', 'Bar', 'Baz')]
vals_list[0]['email'] = 'foo@example.com'
for vals in vals_list:
record = self.env['res.bank'].create(vals)
self.assertEqual(len(record), 1)
self.assertEqual(record.name, vals['name'])
self.assertEqual(record.email, vals.get('email', False))
records = self.env['res.bank'].create([])
self.assertFalse(records)
records = self.env['res.bank'].create(vals_list)
self.assertEqual(len(records), len(vals_list))
for record, vals in zip(records, vals_list):
self.assertEqual(record.name, vals['name'])
self.assertEqual(record.email, vals.get('email', False))
# create countries and states
vals_list = [{
'name': 'Foo',
'state_ids': [
Command.create({'name': 'North Foo', 'code': 'NF'}),
Command.create({'name': 'South Foo', 'code': 'SF'}),
Command.create({'name': 'West Foo', 'code': 'WF'}),
Command.create({'name': 'East Foo', 'code': 'EF'}),
],
'code': 'ZV',
}, {
'name': 'Bar',
'state_ids': [
Command.create({'name': 'North Bar', 'code': 'NB'}),
Command.create({'name': 'South Bar', 'code': 'SB'}),
],
'code': 'ZX',
}]
foo, bar = self.env['res.country'].create(vals_list)
self.assertEqual(foo.name, 'Foo')
self.assertCountEqual(foo.mapped('state_ids.code'), ['NF', 'SF', 'WF', 'EF'])
self.assertEqual(bar.name, 'Bar')
self.assertCountEqual(bar.mapped('state_ids.code'), ['NB', 'SB'])
class TestInherits(TransactionCase):
""" test the behavior of the orm for models that use _inherits;
specifically: res.users, that inherits from res.partner
"""
def test_default(self):
""" `default_get` cannot return a dictionary or a new id """
defaults = self.env['res.users'].default_get(['partner_id'])
if 'partner_id' in defaults:
self.assertIsInstance(defaults['partner_id'], (bool, int))
def test_create(self):
""" creating a user should automatically create a new partner """
partners_before = self.env['res.partner'].search([])
user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
self.assertNotIn(user_foo.partner_id, partners_before)
def test_create_with_ancestor(self):
""" creating a user with a specific 'partner_id' should not create a new partner """
partner_foo = self.env['res.partner'].create({'name': 'Foo'})
partners_before = self.env['res.partner'].search([])
user_foo = self.env['res.users'].create({'partner_id': partner_foo.id, 'login': 'foo'})
partners_after = self.env['res.partner'].search([])
self.assertEqual(partners_before, partners_after)
self.assertEqual(user_foo.name, 'Foo')
self.assertEqual(user_foo.partner_id, partner_foo)
@mute_logger('odoo.models')
def test_read(self):
""" inherited fields should be read without any indirection """
user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
user_values, = user_foo.read()
partner_values, = user_foo.partner_id.read()
self.assertEqual(user_values['name'], partner_values['name'])
self.assertEqual(user_foo.name, user_foo.partner_id.name)
@mute_logger('odoo.models')
def test_copy(self):
""" copying a user should automatically copy its partner, too """
user_foo = self.env['res.users'].create({
'name': 'Foo',
'login': 'foo',
'employee': True,
})
foo_before, = user_foo.read()
del foo_before['create_date']
del foo_before['write_date']
user_bar = user_foo.copy({'login': 'bar'})
foo_after, = user_foo.read()
del foo_after['create_date']
del foo_after['write_date']
self.assertEqual(foo_before, foo_after)
self.assertEqual(user_bar.name, 'Foo (copy)')
self.assertEqual(user_bar.login, 'bar')
self.assertEqual(user_foo.employee, user_bar.employee)
self.assertNotEqual(user_foo.id, user_bar.id)
self.assertNotEqual(user_foo.partner_id.id, user_bar.partner_id.id)
@mute_logger('odoo.models')
def test_copy_with_ancestor(self):
""" copying a user with 'parent_id' in defaults should not duplicate the partner """
user_foo = self.env['res.users'].create({'login': 'foo', 'name': 'Foo', 'signature': 'Foo'})
partner_bar = self.env['res.partner'].create({'name': 'Bar'})
foo_before, = user_foo.read()
del foo_before['create_date']
del foo_before['write_date']
del foo_before['login_date']
partners_before = self.env['res.partner'].search([])
user_bar = user_foo.copy({'partner_id': partner_bar.id, 'login': 'bar'})
foo_after, = user_foo.read()
del foo_after['create_date']
del foo_after['write_date']
del foo_after['login_date']
partners_after = self.env['res.partner'].search([])
self.assertEqual(foo_before, foo_after)
self.assertEqual(partners_before, partners_after)
self.assertNotEqual(user_foo.id, user_bar.id)
self.assertEqual(user_bar.partner_id.id, partner_bar.id)
self.assertEqual(user_bar.login, 'bar', "login is given from copy parameters")
self.assertFalse(user_bar.password, "password should not be copied from original record")
self.assertEqual(user_bar.name, 'Bar', "name is given from specific partner")
self.assertEqual(user_bar.signature, user_foo.signature, "signature should be copied")
@mute_logger('odoo.models')
def test_write_date(self):
""" modifying inherited fields must update write_date """
user = self.env.user
write_date_before = user.write_date
# write base64 image
user.write({'image_1920': 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw=='})
write_date_after = user.write_date
self.assertNotEqual(write_date_before, write_date_after)
@tagged('post_install', '-at_install')
class TestCompanyDependent(TransactionCase):
def test_orm_ondelete_restrict(self):
# model_A
# | field_a company dependent many2one is
# | company dependent many2one stored as jsonb and doesn't
# | (ondelete='restrict') have db ON DELETE action
# v
# model_B
# | field_b if a row for model_B is deleted
# | many2one (ondelete='cascade') because of ON DELETE CASCADE,
# v model_A will reference a deleted
# model_C row and logically be NULL when read
#
# this test asks you to move the
# ON DELETE CASCADE logic of model_B
# to ORM and remove ondelete='cascade'
for model in self.env.registry.values():
for field in model._fields.values():
if field.company_dependent and field.type == 'many2one' and field.ondelete.lower() == 'restrict':
for comodel_field in self.env[field.comodel_name]._fields.values():
self.assertFalse(
comodel_field.type == 'many2one' and comodel_field.ondelete == 'cascade',
(f'when a row for {comodel_field.comodel_name} is deleted, a row for {comodel_field.model_name} '
f'may also be deleted for sake of on delete cascade field {comodel_field}, which will '
f'bypass the ORM ondelete="restrict" check for a company dependent many2one field {field}. '
f'Please override the unlink method of {comodel_field.comodel_name} and do the ORM on '
f'delete cascade logic and remove/override the ondelete="cascade" of {comodel_field}')
)