549 lines
22 KiB
Python
549 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
# ruff: noqa: E201, E272, E301, E306
|
|
|
|
import secrets
|
|
import textwrap
|
|
from contextlib import closing
|
|
from datetime import timedelta
|
|
from unittest.mock import patch
|
|
from freezegun import freeze_time
|
|
|
|
from odoo import fields
|
|
from odoo.tests.common import TransactionCase, RecordCapturer
|
|
from odoo.tools import mute_logger
|
|
from odoo.addons.base.models.ir_cron import MIN_FAILURE_COUNT_BEFORE_DEACTIVATION, MIN_DELTA_BEFORE_DEACTIVATION
|
|
|
|
|
|
class CronMixinCase:
|
|
def capture_triggers(self, cron_id=None):
|
|
"""
|
|
Get a context manager to get all cron triggers created during
|
|
the context lifetime. While in the context, it exposes the
|
|
triggers created so far from the beginning of the context. When
|
|
the context exits, it doesn't capture new triggers anymore.
|
|
|
|
The triggers are accessible on the `records` attribute of the
|
|
returned object.
|
|
|
|
:param cron_id: An optional cron record id (int) or xmlid (str)
|
|
to only capture triggers for that cron.
|
|
"""
|
|
if isinstance(cron_id, str): # xmlid case
|
|
cron_id = self.env.ref(cron_id).id
|
|
|
|
return RecordCapturer(
|
|
model=self.env['ir.cron.trigger'].sudo(),
|
|
domain=[('cron_id', '=', cron_id)] if cron_id else []
|
|
)
|
|
|
|
@classmethod
|
|
def _get_cron_data(cls, env, priority=5):
|
|
unique = secrets.token_urlsafe(8)
|
|
return {
|
|
'name': f'Dummy cron for TestIrCron {unique}',
|
|
'state': 'code',
|
|
'code': '',
|
|
'model_id': env.ref('base.model_res_partner').id,
|
|
'model_name': 'res.partner',
|
|
'user_id': env.uid,
|
|
'active': True,
|
|
'interval_number': 1,
|
|
'interval_type': 'days',
|
|
'nextcall': fields.Datetime.now() + timedelta(hours=1),
|
|
'lastcall': False,
|
|
'priority': priority,
|
|
}
|
|
|
|
@classmethod
|
|
def _get_partner_data(cls, env):
|
|
unique = secrets.token_urlsafe(8)
|
|
return {'name': f'Dummy partner for TestIrCron {unique}'}
|
|
|
|
|
|
class TestIrCron(TransactionCase, CronMixinCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
freezer = freeze_time(cls.cr.now())
|
|
cls.frozen_datetime = freezer.start()
|
|
cls.addClassCleanup(freezer.stop)
|
|
|
|
cls.cron = cls.env['ir.cron'].create(cls._get_cron_data(cls.env))
|
|
cls.partner = cls.env['res.partner'].create(cls._get_partner_data(cls.env))
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.partner.write(self._get_partner_data(self.env))
|
|
self.cron.write(self._get_cron_data(self.env))
|
|
|
|
domain = [('cron_id', '=', self.cron.id)]
|
|
self.env['ir.cron.trigger'].search(domain).unlink()
|
|
self.env['ir.cron.progress'].search(domain).unlink()
|
|
|
|
def test_cron_direct_trigger(self):
|
|
self.cron.code = textwrap.dedent(f"""\
|
|
model.search(
|
|
[("id", "=", {self.partner.id})]
|
|
).write(
|
|
{{"name": "You have been CRONWNED"}}
|
|
)
|
|
""")
|
|
|
|
self.cron.method_direct_trigger()
|
|
|
|
self.assertEqual(self.cron.lastcall, fields.Datetime.now())
|
|
self.assertEqual(self.partner.name, 'You have been CRONWNED')
|
|
|
|
def test_cron_no_job_ready(self):
|
|
self.cron.nextcall = fields.Datetime.now() + timedelta(days=1)
|
|
self.cron.flush_recordset()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertNotIn(self.cron.id, [job['id'] for job in ready_jobs])
|
|
|
|
def test_cron_ready_by_nextcall(self):
|
|
self.cron.nextcall = fields.Datetime.now()
|
|
self.cron.flush_recordset()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertIn(self.cron.id, [job['id'] for job in ready_jobs])
|
|
|
|
def test_cron_ready_by_trigger(self):
|
|
self.cron._trigger()
|
|
self.env['ir.cron.trigger'].flush_model()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertIn(self.cron.id, [job['id'] for job in ready_jobs])
|
|
|
|
def test_cron_unactive_never_ready(self):
|
|
self.cron.active = False
|
|
self.cron.nextcall = fields.Datetime.now()
|
|
self.env.flush_all()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertNotIn(self.cron.id, [job['id'] for job in ready_jobs])
|
|
|
|
def test_cron_ready_jobs_order(self):
|
|
cron_avg = self.cron.copy()
|
|
cron_avg.priority = 5 # average priority
|
|
|
|
cron_high = self.cron.copy()
|
|
cron_high.priority = 0 # highest priority
|
|
|
|
cron_low = self.cron.copy()
|
|
cron_low.priority = 10 # lowest priority
|
|
|
|
crons = cron_high | cron_avg | cron_low # order is important
|
|
crons.write({'nextcall': fields.Datetime.now()})
|
|
crons.flush_recordset()
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
|
|
self.assertEqual(
|
|
[job['id'] for job in ready_jobs if job['id'] in crons._ids],
|
|
list(crons._ids),
|
|
)
|
|
|
|
def test_cron_skip_unactive_triggers(self):
|
|
# Situation: an admin disable the cron and another user triggers
|
|
# the cron to be executed *now*, the cron shouldn't be ready and
|
|
# the trigger should not be stored.
|
|
|
|
self.cron.active = False
|
|
self.cron.nextcall = fields.Datetime.now() + timedelta(days=2)
|
|
self.cron.flush_recordset()
|
|
with self.capture_triggers() as capture:
|
|
self.cron._trigger()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertNotIn(self.cron.id, [job['id'] for job in ready_jobs],
|
|
"the cron shouldn't be ready")
|
|
self.assertFalse(capture.records, "trigger should has been skipped")
|
|
|
|
def test_cron_keep_future_triggers(self):
|
|
# Situation: yesterday an admin disabled the cron, while the
|
|
# cron was disabled, another user triggered it to run today.
|
|
# In case the cron as been re-enabled before "today", it should
|
|
# run.
|
|
|
|
# go yesterday
|
|
self.frozen_datetime.tick(delta=timedelta(days=-1))
|
|
|
|
# admin disable the cron
|
|
self.cron.active = False
|
|
self.cron.nextcall = fields.Datetime.now() + timedelta(days=10)
|
|
self.cron.flush_recordset()
|
|
|
|
# user triggers the cron to run *tomorrow of yesterday (=today)
|
|
with self.capture_triggers() as capture:
|
|
self.cron._trigger(at=fields.Datetime.now() + timedelta(days=1))
|
|
|
|
# admin re-enable the cron
|
|
self.cron.active = True
|
|
self.cron.flush_recordset()
|
|
|
|
# go today, check the cron should run
|
|
self.frozen_datetime.tick(delta=timedelta(days=1))
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertIn(self.cron.id, [job['id'] for job in ready_jobs],
|
|
"cron should be ready")
|
|
self.assertTrue(capture.records, "trigger should has been kept")
|
|
|
|
def test_cron_process_job(self):
|
|
Progress = self.env['ir.cron.progress']
|
|
default_progress_values = {'done': 0, 'remaining': 0, 'timed_out_counter': 0}
|
|
ten_days_ago = fields.Datetime.now() - MIN_DELTA_BEFORE_DEACTIVATION - timedelta(days=2)
|
|
almost_failed = MIN_FAILURE_COUNT_BEFORE_DEACTIVATION - 1
|
|
|
|
def nothing(cron):
|
|
state = {'call_count': 0}
|
|
def f(self):
|
|
state['call_count'] += 1
|
|
return f, state
|
|
|
|
def eleven_success(cron):
|
|
state = {'call_count': 0}
|
|
CALL_TARGET = 11
|
|
def f(self):
|
|
state['call_count'] += 1
|
|
self.env['ir.cron']._notify_progress(
|
|
done=1,
|
|
remaining=CALL_TARGET - state['call_count']
|
|
)
|
|
return f, state
|
|
|
|
def five_success(cron):
|
|
state = {'call_count': 0}
|
|
CALL_TARGET = 5
|
|
def f(self):
|
|
state['call_count'] += 1
|
|
self.env['ir.cron']._notify_progress(
|
|
done=1,
|
|
remaining=CALL_TARGET - state['call_count']
|
|
)
|
|
return f, state
|
|
|
|
def failure(cron):
|
|
state = {'call_count': 0}
|
|
def f(self):
|
|
state['call_count'] += 1
|
|
raise ValueError
|
|
return f, state
|
|
|
|
def failure_partial(cron):
|
|
state = {'call_count': 0}
|
|
CALL_TARGET = 5
|
|
def f(self):
|
|
state['call_count'] += 1
|
|
self.env['ir.cron']._notify_progress(
|
|
done=1,
|
|
remaining=CALL_TARGET - state['call_count']
|
|
)
|
|
self.env.cr.commit()
|
|
raise ValueError
|
|
return f, state
|
|
|
|
def failure_fully(cron):
|
|
state = {'call_count': 0}
|
|
def f(self):
|
|
state['call_count'] += 1
|
|
self.env['ir.cron']._notify_progress(done=1, remaining=0)
|
|
self.env.cr.commit()
|
|
raise ValueError
|
|
return f, state
|
|
|
|
CASES = [
|
|
# IN | OUT
|
|
# callback, curr_failures, trigger, call_count, done_count, fail_count, active,
|
|
( nothing, 0, False, 1, 0, 0, True),
|
|
( nothing, almost_failed, False, 1, 0, 0, True),
|
|
( eleven_success, 0, True, 10, 10, 0, True),
|
|
( eleven_success, almost_failed, True, 10, 10, 0, True),
|
|
( five_success, 0, False, 5, 5, 0, True),
|
|
( five_success, almost_failed, False, 5, 5, 0, True),
|
|
( failure, 0, False, 1, 0, 1, True),
|
|
( failure, almost_failed, False, 1, 0, 0, False),
|
|
(failure_partial, 0, False, 5, 5, 1, True),
|
|
(failure_partial, almost_failed, False, 5, 5, 0, False),
|
|
( failure_fully, 0, False, 1, 1, 1, True),
|
|
( failure_fully, almost_failed, False, 1, 1, 0, False),
|
|
]
|
|
|
|
for cb, curr_failures, trigger, call_count, done_count, fail_count, active in CASES:
|
|
with self.subTest(cb=cb, failure=curr_failures), closing(self.cr.savepoint()):
|
|
self.cron.write({
|
|
'active': True,
|
|
'failure_count': curr_failures,
|
|
'first_failure_date': ten_days_ago if curr_failures else None
|
|
})
|
|
with self.capture_triggers(self.cron.id) as capture:
|
|
if trigger:
|
|
self.cron._trigger()
|
|
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
cb, state = cb(self.cron)
|
|
try:
|
|
with mute_logger('odoo.addons.base.models.ir_cron'),\
|
|
patch.object(self.registry['ir.actions.server'], 'run', cb):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress_values}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
self.cron.invalidate_recordset()
|
|
capture.records.invalidate_recordset()
|
|
|
|
self.assertEqual(self.cron.id in [job['id'] for job in self.cron._get_all_ready_jobs(self.env.cr)], trigger)
|
|
self.assertEqual(state['call_count'], call_count)
|
|
self.assertEqual(Progress.search_count([('cron_id', '=', self.cron.id), ('done', '=', 1)]), done_count)
|
|
self.assertEqual(self.cron.failure_count, fail_count)
|
|
self.assertEqual(self.cron.active, active)
|
|
|
|
def test_cron_retrigger(self):
|
|
Trigger = self.env['ir.cron.trigger']
|
|
Progress = self.env['ir.cron.progress']
|
|
default_progress_values = {'done': 0, 'remaining': 0, 'timed_out_counter': 0}
|
|
|
|
def make_run(cron):
|
|
state = {'call_count': 0}
|
|
CALL_TARGET = 11
|
|
|
|
def run(self):
|
|
state['call_count'] += 1
|
|
self.env['ir.cron']._notify_progress(done=1, remaining=CALL_TARGET - state['call_count'])
|
|
return run, state
|
|
|
|
self.cron._trigger()
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
mocked_run, mocked_run_state = make_run(self.cron)
|
|
try:
|
|
with patch.object(self.registry['ir.actions.server'], 'run', mocked_run):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress_values}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.assertEqual(
|
|
mocked_run_state['call_count'], 10,
|
|
'`run` should have been called ten times',
|
|
)
|
|
self.assertEqual(
|
|
Progress.search_count([('done', '=', 1), ('cron_id', '=', self.cron.id)]), 10,
|
|
'There should be 10 progress log for this cron',
|
|
)
|
|
self.assertEqual(
|
|
Trigger.search_count([('cron_id', '=', self.cron.id)]), 1,
|
|
"One trigger should have been kept",
|
|
)
|
|
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
try:
|
|
with patch.object(self.registry['ir.actions.server'], 'run', mocked_run):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress_values}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertNotIn(
|
|
self.cron.id, [job['id'] for job in ready_jobs],
|
|
'The cron has finished executing'
|
|
)
|
|
self.assertEqual(
|
|
mocked_run_state['call_count'], 10 + 1,
|
|
'`run` should have been called one additional time',
|
|
)
|
|
self.assertEqual(
|
|
Progress.search_count([('done', '=', 1), ('cron_id', '=', self.cron.id)]), 11,
|
|
'There should be 11 progress log for this cron',
|
|
)
|
|
|
|
def test_cron_failed_increase(self):
|
|
self.cron._trigger()
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
default_progress = {'done': 0, 'remaining': 0, 'timed_out_counter': 0}
|
|
try:
|
|
with (
|
|
patch.object(self.registry['ir.cron'], '_callback', side_effect=Exception),
|
|
patch.object(self.registry['ir.cron'], '_notify_admin') as notify,
|
|
):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 1, 'The cron should have failed once')
|
|
self.assertEqual(self.cron.active, True, 'The cron should still be active')
|
|
self.assertFalse(notify.called)
|
|
|
|
self.cron.failure_count = 4
|
|
|
|
self.cron._trigger()
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
try:
|
|
with (
|
|
patch.object(self.registry['ir.cron'], '_callback', side_effect=Exception),
|
|
patch.object(self.registry['ir.cron'], '_notify_admin') as notify,
|
|
):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 5, 'The cron should have failed one more time but not reset (due to time)')
|
|
self.assertEqual(self.cron.active, True, 'The cron should not have been deactivated due to time constraint')
|
|
self.assertFalse(notify.called)
|
|
|
|
self.cron.failure_count = 4
|
|
self.cron.first_failure_date = fields.Datetime.now() - timedelta(days=8)
|
|
|
|
self.cron._trigger()
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
try:
|
|
with (
|
|
patch.object(self.registry['ir.cron'], '_callback', side_effect=Exception),
|
|
patch.object(self.registry['ir.cron'], '_notify_admin') as notify,
|
|
):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 0, 'The cron should have failed one more time and reset to 0')
|
|
self.assertEqual(self.cron.active, False, 'The cron should have been deactivated after 5 failures')
|
|
self.assertTrue(notify.called)
|
|
|
|
def test_cron_timeout_failure(self):
|
|
self.cron._trigger()
|
|
progress = self.env['ir.cron.progress'].create([{
|
|
'cron_id': self.cron.id,
|
|
'remaining': 0,
|
|
'done': 0,
|
|
'timed_out_counter': 3,
|
|
}])
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
|
|
try:
|
|
with mute_logger('odoo.addons.base.models.ir_cron'):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**progress.read(fields=['done', 'remaining', 'timed_out_counter'], load=None)[0], 'progress_id': progress.id, **self.cron.read(load=None)[0]}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 1, 'The cron should have failed once')
|
|
self.assertEqual(self.cron.active, True, 'The cron should still be active')
|
|
|
|
self.cron._trigger()
|
|
self.registry.enter_test_mode(self.cr)
|
|
try:
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**progress.read(fields=['done', 'remaining', 'timed_out_counter'], load=None)[0], 'progress_id': progress.id, **self.cron.read(load=None)[0]}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 0, 'The cron should have succeeded and reset the counter')
|
|
|
|
def test_cron_timeout_success(self):
|
|
self.cron._trigger()
|
|
progress = self.env['ir.cron.progress'].create([{
|
|
'cron_id': self.cron.id,
|
|
'remaining': 0,
|
|
'done': 0,
|
|
'timed_out_counter': 3,
|
|
}])
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
|
|
try:
|
|
with mute_logger('odoo.addons.base.models.ir_cron'):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**progress.read(fields=['done', 'remaining', 'timed_out_counter'], load=None)[0], 'progress_id': progress.id, **self.cron.read(load=None)[0]}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 1, 'The cron should have failed once')
|
|
self.assertEqual(self.cron.active, True, 'The cron should still be active')
|
|
|
|
self.cron._trigger()
|
|
self.registry.enter_test_mode(self.cr)
|
|
try:
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**progress.read(fields=['done', 'remaining', 'timed_out_counter'], load=None)[0], 'progress_id': progress.id, **self.cron.read(load=None)[0]}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertEqual(self.cron.failure_count, 0, 'The cron should have succeeded and reset the counter')
|
|
|
|
def test_acquire_processed_job(self):
|
|
# Test acquire job on already processed jobs
|
|
job = self.env['ir.cron']._acquire_one_job(self.cr, self.cron.id)
|
|
self.assertEqual(job, None, "No error should be thrown, job should just be none")
|
|
|
|
def test_cron_deactivate(self):
|
|
default_progress_values = {'done': 0, 'remaining': 0, 'timed_out_counter': 0}
|
|
|
|
def mocked_run(self):
|
|
self.env['ir.cron']._notify_progress(done=1, remaining=0, deactivate=True)
|
|
|
|
self.cron._trigger()
|
|
self.env.flush_all()
|
|
self.registry.enter_test_mode(self.cr)
|
|
try:
|
|
with patch.object(self.registry['ir.actions.server'], 'run', mocked_run):
|
|
self.registry['ir.cron']._process_job(
|
|
self.registry.db_name,
|
|
self.registry.cursor(),
|
|
{**self.cron.read(load=None)[0], **default_progress_values}
|
|
)
|
|
finally:
|
|
self.registry.leave_test_mode()
|
|
|
|
self.env.invalidate_all()
|
|
self.assertFalse(self.cron.active)
|