Odoo18-Base/odoo/addons/test_read_group/tests/test_m2m_grouping.py
2025-01-06 10:57:38 +07:00

275 lines
10 KiB
Python

""" Test read_group grouping with many2many fields """
from odoo.fields import Command
from odoo.tests import common
from odoo.addons.base.tests.common import TransactionCaseWithUserDemo
@common.tagged('test_m2m_read_group')
class TestM2MGrouping(TransactionCaseWithUserDemo):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.users = cls.env['test_read_group.user'].create([
{'name': 'Mario'},
{'name': 'Luigi'},
])
cls.tasks = cls.env['test_read_group.task'].create([
{ # both users
'name': "Super Mario Bros.",
'user_ids': [Command.set(cls.users.ids)],
},
{ # mario only
'name': "Paper Mario",
'user_ids': [Command.set(cls.users[0].ids)],
},
{ # luigi only
'name': "Luigi's Mansion",
'user_ids': [Command.set(cls.users[1].ids)],
},
{ # no user
'name': 'Donkey Kong',
'user_ids': [Command.set([])],
},
])
def test_base_users(self):
# group users
user_by_tasks = self.users.read_group(
domain=[],
fields=['name:array_agg'],
groupby=['task_ids'],
)
self.assertEqual(user_by_tasks, [
{ # first task: both users
'task_ids': (self.tasks[0].id, "Super Mario Bros."),
'task_ids_count': 2,
'name': ['Mario', 'Luigi'],
'__domain': [('task_ids', '=', self.tasks[0].id)],
},
{ # second task: Mario only
'task_ids': (self.tasks[1].id, "Paper Mario"),
'task_ids_count': 1,
'name': ['Mario'],
'__domain': [('task_ids', '=', self.tasks[1].id)],
},
{ # third task: Luigi only
'task_ids': (self.tasks[2].id, "Luigi's Mansion"),
'task_ids_count': 1,
'name': ['Luigi'],
'__domain': [('task_ids', '=', self.tasks[2].id)],
},
])
def test_base_tasks(self):
# consider the simplest case first: one task with two users
task_by_users = self.tasks.read_group(
domain=[('id', '=', self.tasks[0].id)],
fields=['name:array_agg'],
groupby=['user_ids'],
)
self.assertEqual(task_by_users, [
{ # task of Mario
'user_ids': (self.users[0].id, "Mario"),
'user_ids_count': 1,
'name': ["Super Mario Bros."],
'__domain': ['&', ('id', '=', self.tasks[0].id), ('user_ids', '=', self.users[0].id)],
},
{ # task of Luigi
'user_ids': (self.users[1].id, "Luigi"),
'user_ids_count': 1,
'name': ["Super Mario Bros."],
'__domain': ['&', ('id', '=', self.tasks[0].id), ('user_ids', '=', self.users[1].id)],
},
])
# now consider the full case: all tasks, with all user combinations
task_by_users = self.tasks.read_group(
domain=[],
fields=['name:array_agg'],
groupby=['user_ids'],
)
self.assertEqual(task_by_users, [
{ # tasks of Mario
'user_ids': (self.users[0].id, "Mario"),
'user_ids_count': 2,
'name': unordered(["Super Mario Bros.", "Paper Mario"]),
'__domain': [('user_ids', '=', self.users[0].id)],
},
{ # tasks of Luigi
'user_ids': (self.users[1].id, "Luigi"),
'user_ids_count': 2,
'name': unordered(["Super Mario Bros.", "Luigi's Mansion"]),
'__domain': [('user_ids', '=', self.users[1].id)],
},
{ # tasks of nobody
'user_ids': False,
'user_ids_count': 1,
'name': unordered(["Donkey Kong"]),
'__domain': [('user_ids', 'not in', [self.users[0].id, self.users[1].id])],
},
])
# check that the domain returned by read_group is valid
tasks_from_domain = self.tasks.search(task_by_users[0]['__domain'])
self.assertEqual(tasks_from_domain, self.tasks[:2])
tasks_from_domain = self.tasks.search(task_by_users[1]['__domain'])
self.assertEqual(tasks_from_domain, self.tasks[0] + self.tasks[2])
tasks_from_domain = self.tasks.search(task_by_users[2]['__domain'])
self.assertEqual(tasks_from_domain, self.tasks[3])
def test_complex_case(self):
# group tasks with some ir.rule on users
users_model = self.env['ir.model']._get(self.users._name)
self.env['ir.rule'].create({
'name': "Only The Lone Wanderer allowed",
'model_id': users_model.id,
'domain_force': [('id', '=', self.users[0].id)],
})
# warmup
as_admin = self.tasks.read_group(
domain=[],
fields=['name:array_agg'],
groupby=['user_ids'],
)
# as superuser, ir.rule should not apply
expected = """
SELECT
"test_read_group_task__user_ids"."user_id",
COUNT(*),
ARRAY_AGG("test_read_group_task"."name" ORDER BY "test_read_group_task"."id")
FROM "test_read_group_task"
LEFT JOIN "test_read_group_task_user_rel" AS "test_read_group_task__user_ids" ON ("test_read_group_task"."id" = "test_read_group_task__user_ids"."task_id")
GROUP BY "test_read_group_task__user_ids"."user_id"
ORDER BY "test_read_group_task__user_ids"."user_id" ASC
"""
with self.assertQueries([expected]):
as_admin = self.tasks.read_group(
domain=[],
fields=['name:array_agg'],
groupby=['user_ids'],
)
self.assertEqual(as_admin, [
{ # tasks of Mario
'user_ids': (self.users[0].id, "Mario"),
'user_ids_count': 2,
'name': unordered(["Super Mario Bros.", "Paper Mario"]),
'__domain': [('user_ids', '=', self.users[0].id)],
},
{ # tasks of Luigi
'user_ids': (self.users[1].id, "Luigi"),
'user_ids_count': 2,
'name': unordered(["Super Mario Bros.", "Luigi's Mansion"]),
'__domain': [('user_ids', '=', self.users[1].id)],
},
{ # tasks of nobody
'user_ids': False,
'user_ids_count': 1,
'name': unordered(["Donkey Kong"]),
'__domain': [('user_ids', 'not in', [self.users[0].id, self.users[1].id])],
},
])
# as demo user, ir.rule should apply
tasks = self.tasks.with_user(self.user_demo)
# warming up various caches; this avoids extra queries
tasks.read_group(domain=[], fields=['name:array_agg'], groupby=['user_ids'])
expected = """
SELECT
"test_read_group_task__user_ids"."user_id",
COUNT(*),
ARRAY_AGG("test_read_group_task"."name" ORDER BY "test_read_group_task"."id")
FROM "test_read_group_task"
LEFT JOIN "test_read_group_task_user_rel" AS "test_read_group_task__user_ids"
ON (
"test_read_group_task"."id" = "test_read_group_task__user_ids"."task_id"
AND "test_read_group_task__user_ids"."user_id" IN (
SELECT "test_read_group_user"."id"
FROM "test_read_group_user"
WHERE ("test_read_group_user"."id" = %s)
)
)
GROUP BY "test_read_group_task__user_ids"."user_id"
ORDER BY "test_read_group_task__user_ids"."user_id" ASC
"""
with self.assertQueries([expected]):
as_demo = tasks.read_group(
domain=[],
fields=['name:array_agg'],
groupby=['user_ids'],
)
self.assertEqual(as_demo, [
{ # tasks of Mario
'user_ids': (self.users[0].id, "Mario"),
'user_ids_count': 2,
'name': unordered(['Super Mario Bros.', 'Paper Mario']),
'__domain': [('user_ids', '=', self.users[0].id)],
},
{ # tasks of Luigi and no user
'user_ids': False,
'user_ids_count': 2,
'name': unordered(["Luigi's Mansion", 'Donkey Kong']),
'__domain': [('user_ids', 'not in', self.users[0].ids)],
},
])
for group in as_demo:
self.assertEqual(
group['user_ids_count'],
tasks.search_count(group['__domain']),
'A search using the domain returned by the read_group should give the '
'same number of records as counted in the group',
)
def test_ordered_tasks(self):
"""
Depending on the order of the group_by, you may obtain non-desired behavior.
In this test, we check the operation of read_group in the event that the first
group (defined by orderby) contains no results.
Default order is 'users_ids ASC'
So we reverse the order to have the spot without users in first position.
"""
tasks_by_users = self.tasks.read_group(
domain=[],
fields=['name'],
groupby=['user_ids'],
orderby='user_ids DESC',
)
self.assertEqual(tasks_by_users, [
{ # tasks of no one
'user_ids': False,
'user_ids_count': 1,
'__domain': [('user_ids', 'not in', [self.users[1].id, self.users[0].id])],
},
{ # tasks of Luigi
'user_ids': (self.users[1].id, 'Luigi'),
'user_ids_count': 2,
'__domain': [('user_ids', '=', self.users[1].id)],
},
{ # tasks of Mario
'user_ids': (self.users[0].id, 'Mario'),
'user_ids_count': 2,
'__domain': [('user_ids', '=', self.users[0].id)],
},
])
class unordered(list):
""" A list where equality is interpreted without ordering. """
__slots__ = ()
def __eq__(self, other):
return sorted(self) == sorted(other)
def __ne__(self, other):
return sorted(self) != sorted(other)