import logging import timeit from typing import Literal from odoo import Command from odoo.models import BaseModel from odoo.tests.common import TransactionCase _logger = logging.getLogger(__name__) class TestPerformanceTimeit(TransactionCase): @classmethod def setUpClass(cls): super().setUpClass() cls.Model = cls.env['test_performance.simple.minded'].with_context(active_test=False) def create_parent(size): return cls.Model.create({ 'name': f'parent_{size}_children', 'child_ids': [ Command.create({ 'name': f'{size}_child_{i}', 'active': (i % 4) != 0, }) for i in range(size) ], }) cls.parent_0_child = create_parent(0) cls.parent_1_child = create_parent(1) cls.parent_10_children = create_parent(10) cls.parent_100_children = create_parent(100) cls.parent_1000_children = create_parent(1_000) cls.parent_10000_children = create_parent(10_000) cls.example_domains = [ [('id', '<', 100)], [('id', '<', 100), ('name', '=like', 'par')], [('active', '=', False)], [('parent_id.name', 'like', "100")], [('parent_id', 'like', "100")], ] @classmethod def get_parents(cls): return [ cls.parent_1_child, cls.parent_10_children, cls.parent_100_children, cls.parent_1000_children, cls.parent_10000_children, ] @classmethod def get_test_children(cls, *, max_size=10**6): """Get records for testing, give the max size of the recordset""" all_records = [p.child_ids for p in cls.get_parents()] result = [recs for recs in all_records if len(recs) < max_size] # find the next bigger and trucate it to max size if bigger := next((recs for recs in all_records if len(recs) > max_size), None): result.append(bigger[:max_size]) return result def setUp(self): super().setUp() # Warm up the cache of all data self.Model.with_context(active_test=False).search([]).mapped('name') def launch_perf( self, code: str, records: BaseModel, *, relative_size: int = 1, # relative size of the batch (for comparisons) number: int = 100, # number of runs in each iteration repeat: int = 3, # number of repeated runs ctx: dict = {}, # additional local variables ) -> float: """Run a performance test. Returns the best execution run time in microseconds. The number of runs is `number * repeat`. """ assert repeat > 1, "repeat at least twice as the first result is often slower" assert number > 0, "number of runs must be positive" times = timeit.repeat(code, globals={**ctx, 'records': records}, repeat=repeat, number=number) best_mean = min(times) / number * 1_000_000 if relative_size != 1: _logger.info(" `%s` takes %.3fµs (%.3fµs/%d)", code, best_mean / relative_size, best_mean, relative_size) else: _logger.info(" `%s` takes %.3fµs", code, best_mean) return best_mean def launch_perf_set( self, code: str, *, record_list: list[BaseModel] | None = None, relative_size: list[int] | None = None, check_type: Literal['linear', 'maybe-linear', None] = 'linear', number: int = 4, repeat: int = 3, **kw, ): # initialize the record list with the children records if not record_list: record_list = self.get_test_children() # relative sizes are initialized to 1, 10, 100, ... relative_sizes = relative_size or [10 ** i for i in range(len(record_list))] assert len(relative_sizes) == len(record_list) results = [ self.launch_perf(code, records=records, relative_size=relative_size, repeat=repeat, number=number, **kw) for records, relative_size in zip(record_list, relative_sizes) ] # checks if len(results) <= 3: check_type = None if check_type in ('linear', 'maybe-linear'): # approximative check that the resulting runs are behaving linearly # skip the first result as it is very small and not comparable check_results = [r / s for r, s in zip(results, relative_sizes)][1:] min_time = check_results[0] # take the time for the first check result as a comparison point max_time = max(check_results) # just check that the biggest difference of timings per record # compared to minimum run time is not greater than the max_tolerance max_tolerance = 2.5 if check_type == 'linear': self.assertLess(max_time / min_time, max_tolerance, f"Non-linear behaviour detected, relative results: {check_results}") else: _logger.info("Linear behaviour result is %s for %s", max_time / min_time < max_tolerance, check_results) else: self.assertFalse(check_type, "Unsupported check_type") return results def test_perf_field_get(self): self.launch_perf("records.name", self.parent_0_child) def test_perf_field_getitem(self): self.launch_perf("records['name']", self.parent_0_child) def test_perf_field_set(self): self.launch_perf("records.name = 'ok'", self.parent_0_child) self.launch_perf_set("records.name = records[0].name") def test_perf_field_set_flush(self): self.launch_perf("records.flush_recordset()", self.parent_0_child) self.launch_perf("records.write({'name': 'ok'}); records.flush_recordset()", self.parent_0_child) def test_perf_filtered_by_field(self): self.launch_perf_set("records.filtered('active')") def test_perf_mapped(self): self.launch_perf_set("records.mapped('name')") def test_perf_sorted(self): self.launch_perf_set("records.sorted('name')") def test_perf_access_one2many_active_test(self): record_list = [ p.with_context(active_test=True) for p in self.get_parents() ] self.launch_perf_set("records.child_ids", record_list=record_list) def test_perf_access_iter(self): self.launch_perf_set("list(records)") def test_perf_as_query(self): self.launch_perf_set("records._as_query()", number=100) def test_perf_exists(self): self.launch_perf_set("records.exists()") def test_perf_search_query(self): self.launch_perf("records._search([])", self.Model) self.launch_perf("records._search([], limit=10)", self.Model) self.launch_perf("records._search([], order='id')", self.Model) self.launch_perf("records._search([], order='name')", self.Model) self.launch_perf("records._search([], order='parent_id, id desc')", self.Model) def test_perf_domain_search(self): for domain in self.example_domains: self.launch_perf(f"records._search({domain!r})", self.Model) def test_perf_domain_filtered(self): for domain in self.example_domains: self.launch_perf_set(f"records.filtered_domain({domain!r})", repeat=2) def test_perf_xxlarge_domain(self): def large_domain(records): N = len(records) return ['|'] * (N - 1) + [('name', '=', 'admin')] * N ctx = {'dom': large_domain} # _search() self.launch_perf_set("records._search(dom(records))", ctx=ctx, repeat=2, number=3, check_type='maybe-linear') # search() with result, minimal run times, just to check if we can handle the query execution self.launch_perf_set("records.search(dom(records))", # max is set to 9.5k because for 10k we get an out of memory error record_list=self.get_test_children(max_size=9500), ctx=ctx, repeat=2, number=1, check_type='maybe-linear') # filtered_domain() is non-linear and may time-out! self.launch_perf_set("records.filtered_domain(dom(records))", record_list=self.get_test_children(max_size=400), ctx=ctx, repeat=2, number=2, check_type=None) def test_perf_xxlarge_domain_unique(self): def large_domain_uniq(records): N = len(records) return ['|'] * (N - 1) + [('name', '=', str(i)) for i in range(N)] ctx = {'dom': large_domain_uniq} self.launch_perf_set("records._search(dom(records))", ctx=ctx, repeat=2, number=3, check_type='maybe-linear')