131 lines
4.9 KiB
Python
131 lines
4.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
from datetime import timedelta
|
|
from unittest.mock import patch
|
|
from freezegun import freeze_time
|
|
|
|
from odoo import fields
|
|
from odoo.tests.common import TransactionCase, RecordCapturer, get_db_name
|
|
|
|
|
|
class CronMixinCase:
|
|
def capture_triggers(self, cron_id=None):
|
|
"""
|
|
Get a context manager to get all cron triggers created during
|
|
the context lifetime. While in the context, it exposes the
|
|
triggers created so far from the beginning of the context. When
|
|
the context exits, it doesn't capture new triggers anymore.
|
|
|
|
The triggers are accessible on the `records` attribute of the
|
|
returned object.
|
|
|
|
:param cron_id: An optional cron record id (int) or xmlid (str)
|
|
to only capture triggers for that cron.
|
|
"""
|
|
if isinstance(cron_id, str): # xmlid case
|
|
cron_id = self.env.ref(cron_id).id
|
|
|
|
return RecordCapturer(
|
|
model=self.env['ir.cron.trigger'].sudo(),
|
|
domain=[('cron_id', '=', cron_id)] if cron_id else []
|
|
)
|
|
|
|
|
|
class TestIrCron(TransactionCase, CronMixinCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
freezer = freeze_time(cls.cr.now())
|
|
cls.frozen_datetime = freezer.start()
|
|
cls.addClassCleanup(freezer.stop)
|
|
|
|
def setUp(self):
|
|
super(TestIrCron, self).setUp()
|
|
|
|
self.cron = self.env['ir.cron'].create({
|
|
'name': 'TestCron',
|
|
'model_id': self.env.ref('base.model_res_partner').id,
|
|
'state': 'code',
|
|
'code': 'model.search([("name", "=", "TestCronRecord")]).write({"name": "You have been CRONWNED"})',
|
|
'interval_number': 1,
|
|
'interval_type': 'days',
|
|
'numbercall': -1,
|
|
'doall': False,
|
|
})
|
|
self.test_partner = self.env['res.partner'].create({
|
|
'name': 'TestCronRecord'
|
|
})
|
|
self.test_partner2 = self.env['res.partner'].create({
|
|
'name': 'NotTestCronRecord'
|
|
})
|
|
|
|
def test_cron_direct_trigger(self):
|
|
self.assertFalse(self.cron.lastcall)
|
|
self.assertEqual(self.test_partner.name, 'TestCronRecord')
|
|
self.assertEqual(self.test_partner2.name, 'NotTestCronRecord')
|
|
|
|
def patched_now(*args, **kwargs):
|
|
return '2020-10-22 08:00:00'
|
|
|
|
with patch('odoo.fields.Datetime.now', patched_now):
|
|
self.cron.method_direct_trigger()
|
|
|
|
self.assertEqual(fields.Datetime.to_string(self.cron.lastcall), '2020-10-22 08:00:00')
|
|
self.assertEqual(self.test_partner.name, 'You have been CRONWNED')
|
|
self.assertEqual(self.test_partner2.name, 'NotTestCronRecord')
|
|
|
|
def test_cron_skip_unactive_triggers(self):
|
|
# Situation: an admin disable the cron and another user triggers
|
|
# the cron to be executed *now*, the cron shouldn't be ready and
|
|
# the trigger should not be stored.
|
|
|
|
self.cron.active = False
|
|
self.cron.nextcall = fields.Datetime.now() + timedelta(days=2)
|
|
self.cron.flush_recordset()
|
|
with self.capture_triggers() as capture:
|
|
self.cron._trigger()
|
|
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertNotIn(self.cron.id, [job['id'] for job in ready_jobs],
|
|
"the cron shouldn't be ready")
|
|
self.assertFalse(capture.records, "trigger should has been skipped")
|
|
|
|
def test_cron_keep_future_triggers(self):
|
|
# Situation: yesterday an admin disabled the cron, while the
|
|
# cron was disabled, another user triggered it to run today.
|
|
# In case the cron as been re-enabled before "today", it should
|
|
# run.
|
|
|
|
# go yesterday
|
|
self.frozen_datetime.tick(delta=timedelta(days=-1))
|
|
|
|
# admin disable the cron
|
|
self.cron.active = False
|
|
self.cron.nextcall = fields.Datetime.now() + timedelta(days=10)
|
|
self.cron.flush_recordset()
|
|
|
|
# user triggers the cron to run *tomorrow of yesterday (=today)
|
|
with self.capture_triggers() as capture:
|
|
self.cron._trigger(at=fields.Datetime.now() + timedelta(days=1))
|
|
|
|
# admin re-enable the cron
|
|
self.cron.active = True
|
|
self.cron.flush_recordset()
|
|
|
|
# go today, check the cron should run
|
|
self.frozen_datetime.tick(delta=timedelta(days=1))
|
|
ready_jobs = self.registry['ir.cron']._get_all_ready_jobs(self.cr)
|
|
self.assertIn(self.cron.id, [job['id'] for job in ready_jobs],
|
|
"cron should be ready")
|
|
self.assertTrue(capture.records, "trigger should has been kept")
|
|
|
|
def test_cron_null_interval(self):
|
|
self.cron.interval_number = 0
|
|
with self.assertLogs('odoo.addons.base.models.ir_cron', 'ERROR'):
|
|
self.cron._process_job(get_db_name(), self.env.cr, self.cron.read(load=False)[0])
|
|
self.cron.invalidate_recordset(['active'])
|
|
self.assertFalse(self.cron.active)
|