# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. from ast import literal_eval from contextlib import contextmanager from freezegun import freeze_time from datetime import datetime from dateutil.relativedelta import relativedelta from lxml import html from unittest.mock import patch from werkzeug.urls import url_encode, url_join from odoo import SUPERUSER_ID from odoo.addons.base.tests.common import HttpCaseWithUserDemo from odoo.addons.digest.tests.common import TestDigestCommon from odoo.addons.mail.tests.common import MailCommon from odoo.tests import tagged from odoo.tests.common import users from odoo.tools import mute_logger class TestDigest(TestDigestCommon): @contextmanager def mock_datetime_and_now(self, mock_dt): """ Used when synchronization date (using env.cr.now()) is important in addition to standard datetime mocks. Used mainly to detect sync issues. """ with freeze_time(mock_dt), \ patch.object(self.env.cr, 'now', lambda: mock_dt): yield @classmethod def setUpClass(cls): super(TestDigest, cls).setUpClass() cls.reference_datetime = datetime(2024, 2, 13, 13, 30, 0) # clean messages cls.env['mail.message'].search([ ('subtype_id', '=', cls.env.ref('mail.mt_comment').id), ('message_type', 'in', ('comment', 'email', 'email_outgoing')), ]).unlink() cls._setup_messages() # clean demo users so that we keep only the test users cls.env['res.users'].search([('login', 'in', ['demo', 'portal'])]).action_archive() # clean logs so that town down can be tested cls.env['res.users.log'].search([('create_uid', 'in', (cls.user_admin + cls.user_employee).ids)]).unlink() # create logs for user_admin cls._setup_logs_for_users(cls.user_admin, cls.reference_datetime - relativedelta(days=5)) with cls.mock_datetime_and_now(cls, cls.reference_datetime): cls.test_digest, cls.test_digest_2 = cls.env['digest.digest'].create([ { "kpi_mail_message_total": True, "kpi_res_users_connected": True, "name": "My Digest", "periodicity": "daily", }, { "kpi_mail_message_total": True, "kpi_res_users_connected": True, "name": "My Digest", "periodicity": "weekly", "user_ids": [(4, cls.user_admin.id), (4, cls.user_employee.id)], } ]) @classmethod def _setup_logs_for_users(cls, res_users, log_dt): with cls.mock_datetime_and_now(cls, log_dt): for user in res_users: cls.env['res.users.log'].with_user(SUPERUSER_ID).create({ 'create_uid': user.id, }) @users('admin') def test_assert_initial_values(self): """ Ensure base values for tests """ test_digest = self.test_digest.with_user(self.env.user) test_digest_2 = self.test_digest_2.with_user(self.env.user) self.assertEqual(test_digest.create_date, self.reference_datetime) self.assertEqual(test_digest.next_run_date, self.reference_datetime.date() + relativedelta(days=1)) self.assertEqual(test_digest.periodicity, 'daily') self.assertFalse(test_digest.user_ids) self.assertEqual(test_digest_2.create_date, self.reference_datetime) self.assertEqual(test_digest_2.next_run_date, self.reference_datetime.date() + relativedelta(weeks=1)) self.assertEqual(test_digest_2.periodicity, 'weekly') self.assertEqual(test_digest_2.user_ids, self.user_admin + self.user_employee) @users('admin') def test_digest_kpi_res_users_connected_value(self): self.env['res.users.log'].with_user(SUPERUSER_ID).search([]).unlink() # Sanity check initial_values = self.all_digests.mapped('kpi_res_users_connected_value') self.assertEqual(initial_values, [0, 0, 0]) self.env['res.users'].with_user(self.user_employee)._update_last_login() self.env['res.users'].with_user(self.user_admin)._update_last_login() self.all_digests.invalidate_recordset() self.assertEqual(self.digest_1.kpi_res_users_connected_value, 2) self.assertEqual(self.digest_2.kpi_res_users_connected_value, 0, msg='This KPI is in an other company') self.assertEqual(self.digest_3.kpi_res_users_connected_value, 2, msg='This KPI has no company, should take the current one') @users('admin') def test_digest_numbers(self): digest = self.env['digest.digest'].browse(self.digest_1.ids) digest._action_subscribe_users(self.user_employee) # digest creates its mails in auto_delete mode so we need to capture # the formatted body during the sending process digest.flush_recordset() with self.mock_mail_gateway(): digest.action_send() self.assertEqual(len(self._new_mails), 1, "A new mail.mail should have been created") mail = self._new_mails[0] # check mail.mail content self.assertEqual(mail.author_id, self.partner_admin) self.assertEqual(mail.email_from, self.company_admin.email_formatted) self.assertEqual(mail.state, 'outgoing', 'Mail should use the queue') kpi_message_values = html.fromstring(mail.body_html).xpath('//table[@data-field="kpi_mail_message_total"]//*[hasclass("kpi_value")]/text()') self.assertEqual( [t.strip() for t in kpi_message_values], ['3', '8', '15'] ) @users('admin') def test_digest_subscribe(self): digest_user = self.digest_1.with_user(self.user_employee) self.assertFalse(digest_user.is_subscribed) # subscribe a user so at least one mail gets sent digest_user.action_subscribe() self.assertTrue( digest_user.is_subscribed, "check the user was subscribed as action_subscribe will silently " "ignore subs of non-employees" ) digest_user.action_unsubscribe() self.assertFalse(digest_user.is_subscribed) @users('admin') def test_digest_tip_description(self): self.env["digest.tip"].create({ 'name': "Test digest tips", 'tip_description': """

Record exists.

Record doesn't exist.

""", }) with self.mock_mail_gateway(): self.digest_1._action_send_to_user(self.user_employee) self.assertEqual(len(self._new_mails), 1, "A new Email should have been created") sent_mail_body = html.fromstring(self._new_mails.body_html) values_to_check = [ sent_mail_body.xpath('//t[@t-set="record_exists"]'), sent_mail_body.xpath('//p[@class="rendered"]/text()'), sent_mail_body.xpath('//p[@class="not-rendered"]/text()') ] self.assertEqual( values_to_check, [[], ['Record exists.'], []], "Sent mail should contain properly rendered tip content" ) @users('admin') def test_digest_tone_down(self): test_digest = self.env['digest.digest'].browse(self.test_digest.ids) test_digest_2 = self.env['digest.digest'].browse(self.test_digest_2.ids) test_digest._action_subscribe_users(self.user_employee) digests = test_digest + test_digest_2 # batch recordset # no logs for employee but for admin -> should tone down periodicity of # first digest, not the second one (admin being subscribed) digests.flush_recordset() current_dt = self.reference_datetime + relativedelta(days=1) with self.mock_datetime_and_now(current_dt), \ self.mock_mail_gateway(): digests.action_send() self.assertEqual(test_digest.next_run_date, current_dt.date() + relativedelta(weeks=1)) self.assertEqual(test_digest.periodicity, 'weekly') self.assertEqual(test_digest_2.next_run_date, current_dt.date() + relativedelta(weeks=1)) self.assertEqual(test_digest_2.periodicity, 'weekly', 'Should not have tone down because admin has logs') # no logs for employee -> should tone down periodicity with self.mock_datetime_and_now(current_dt), \ self.mock_mail_gateway(): digests.action_send() self.assertEqual(test_digest.next_run_date, current_dt.date() + relativedelta(months=1)) self.assertEqual(test_digest.periodicity, 'monthly') self.assertEqual(test_digest_2.next_run_date, current_dt.date() + relativedelta(weeks=1)) self.assertEqual(test_digest_2.periodicity, 'weekly') # no logs for employee -> should tone down periodicity with self.mock_datetime_and_now(current_dt), \ self.mock_mail_gateway(): digests.action_send() self.assertEqual(test_digest.next_run_date, current_dt.date() + relativedelta(months=3)) self.assertEqual(test_digest.periodicity, 'quarterly') self.assertEqual(test_digest_2.next_run_date, current_dt.date() + relativedelta(weeks=1)) self.assertEqual(test_digest_2.periodicity, 'weekly') @users('admin') def test_digest_tone_down_wlogs(self): digest = self.env['digest.digest'].browse(self.digest_1.ids) digest._action_subscribe_users(self.user_employee) for logs, (periodicity, run_date), (exp_periodicity, exp_run_date, msg) in zip( [ # daily [(self.user_employee, self.reference_datetime)], [(self.user_employee, self.reference_datetime - relativedelta(days=1, hours=23))], # two days logs -> do not tone down [(self.user_employee, self.reference_datetime - relativedelta(days=2, hours=1))], # > two days logs -> tone down [], # no logs -> tone down # weekly [(self.user_employee, self.reference_datetime - relativedelta(days=6))], [(self.user_employee, self.reference_datetime - relativedelta(days=8))], # old logs -> tone down [], # no logs -> tone down # monthly [(self.user_employee, self.reference_datetime - relativedelta(days=25))], [(self.user_employee, self.reference_datetime - relativedelta(days=32))], # old logs -> tone down [], # no logs -> tone down # quarterly [(self.user_employee, self.reference_datetime - relativedelta(months=2))], [(self.user_employee, self.reference_datetime - relativedelta(months=4))], # old logs but end of tone down [], # no logs but end of town down ], [ # daily ('daily', self.reference_datetime.date()), ('daily', self.reference_datetime.date()), ('daily', self.reference_datetime.date()), ('daily', self.reference_datetime.date()), # weekly ('weekly', self.reference_datetime.date()), ('weekly', self.reference_datetime.date()), ('weekly', self.reference_datetime.date()), # monthly ('monthly', self.reference_datetime.date()), ('monthly', self.reference_datetime.date()), ('monthly', self.reference_datetime.date()), # quarterly ('quarterly', self.reference_datetime.date()), ('quarterly', self.reference_datetime.date()), ('quarterly', self.reference_datetime.date()), ], [ ('daily', self.reference_datetime.date() + relativedelta(days=1), 'Daily ok'), # just push date ('daily', self.reference_datetime.date() + relativedelta(days=1), 'Daily ok, 2 days - 1 hour'), # just push date ('weekly', self.reference_datetime.date() + relativedelta(weeks=1), 'Daily old logs (2 days + 1 hour)'), # tone down on daily ('weekly', self.reference_datetime.date() + relativedelta(weeks=1), 'Daily no logs'), # tone down on daily # weekly ('weekly', self.reference_datetime.date() + relativedelta(weeks=1), 'Weekly ok'), # just push date ('monthly', self.reference_datetime.date() + relativedelta(months=1), 'Weekly old logs'), # tone down on weekly ('monthly', self.reference_datetime.date() + relativedelta(months=1), 'Weekly no logs'), # tone down on weekly # monthly ('monthly', self.reference_datetime.date() + relativedelta(months=1), 'Monthly ok'), # just push date ('quarterly', self.reference_datetime.date() + relativedelta(months=3), 'Monthly old logs'), # tone down on monthly ('quarterly', self.reference_datetime.date() + relativedelta(months=3), 'Monthly no logs'), # tone down on monthly # quarterly ('quarterly', self.reference_datetime.date() + relativedelta(months=3), 'Quaterly ok'), # just push date ('quarterly', self.reference_datetime.date() + relativedelta(months=3), 'Quaterly ok'), # just push date ('quarterly', self.reference_datetime.date() + relativedelta(months=3), 'Quaterly ok'), # just push date ], ): with self.subTest(logs=logs, msg=msg, periodicity=periodicity, run_date=run_date): digest.write({ 'next_run_date': run_date, 'periodicity': periodicity, }) for log_user, log_dt in logs: self._setup_logs_for_users(log_user, log_dt) with self.mock_datetime_and_now(self.reference_datetime), \ self.mock_mail_gateway(): digest.action_send() self.assertEqual(digest.next_run_date, exp_run_date) self.assertEqual(digest.periodicity, exp_periodicity) self.env['res.users.log'].with_user(SUPERUSER_ID).search([]).unlink() @tagged("digest", "mail_mail", "-at_install", "post_install") class TestUnsubscribe(MailCommon, HttpCaseWithUserDemo): def setUp(self): super(TestUnsubscribe, self).setUp() self.test_digest = self.env['digest.digest'].create({ 'kpi_mail_message_total': True, 'kpi_res_users_connected': True, 'name': "My Digest", 'periodicity': 'daily', 'user_ids': self.user_demo.ids, }) self.test_digest._action_subscribe_users(self.user_demo) self.base_url = self.test_digest.get_base_url() self.user_demo_unsubscribe_token = self.test_digest._get_unsubscribe_token(self.user_demo.id) def test_mail_mail_headers(self): """ Test mail generated for digest contains unsubscribe headers """ digest = self.env['digest.digest'].browse(self.test_digest.ids) digest._action_subscribe_users(self.user_employee) with self.mock_mail_gateway(): digest.action_send() # find outgoing mail, click on unsubscribe link for user in self.user_employee + self.user_demo: mail = self._find_mail_mail_wemail(user.email_formatted, "outgoing") headers = literal_eval(mail.headers) unsubscribe_url = headers.get("List-Unsubscribe", "").strip("<>") self.assertTrue(unsubscribe_url) self.opener.post(unsubscribe_url) self.assertFalse(digest.user_ids, "Users should have been unsubscribed from digest") def test_unsubscribe(self): """ Test various combination of unsubscribe: logged, using token, ... """ digest = self.test_digest demo_token = digest._get_unsubscribe_token(self.user_demo.id) for test_user, is_member, is_logged, token, exp_code in [ (self.user_demo, True, True, False, 200), # unsubscribe logged, easy (self.user_demo, False, True, False, 200), # unsubscribe not a member should not crash (self.user_demo, False, False, demo_token, 200), # unsubscribe using a token (self.user_demo, False, False, 'probably-not-a-token', 404), # wrong token -> crash (self.user_demo, False, False, False, 404), # cannot be done unlogged / no token ]: with self.subTest(user_name=test_user.name, is_member=is_member, is_logged=is_logged, token=token): if is_member: digest._action_subscribe_users(test_user) self.assertIn(test_user, digest.user_ids) else: digest._action_unsubscribe_users(test_user) self.assertNotIn(test_user, digest.user_ids) self.authenticate(test_user.login if is_logged else None, test_user.login if is_logged else None) if token: response = self._url_unsubscribe(token=token, user_id=test_user.id) else: response = self._url_unsubscribe() self.assertEqual(response.status_code, exp_code) self.assertNotIn(test_user, digest.user_ids) def test_unsubscribe_token_one_click(self): """ Test one-click: should be ok with POST, not GET to avoid link crawling """ self.assertIn(self.user_demo, self.test_digest.user_ids) self.authenticate(None, None) with mute_logger('odoo.addons.http_routing.models.ir_http'): # Ensure we cannot unregister using GET method (method not allowed) response = self._url_unsubscribe(token=self.user_demo_unsubscribe_token, user_id=self.user_demo.id, one_click='1', method='GET') self.assertEqual(response.status_code, 405, 'GET method is not allowed') self.assertIn(self.user_demo, self.test_digest.user_ids) # Ensure we can unregister with POST method response = self._url_unsubscribe(token=self.user_demo_unsubscribe_token, user_id=self.user_demo.id, one_click='1', method='POST') self.assertEqual(response.status_code, 200, 'Valid one-click unsubscribe just returns an OK 200') self.assertNotIn(self.user_demo, self.test_digest.user_ids) def _url_unsubscribe(self, token=None, user_id=None, one_click=None, method='GET'): url_params = {} if token is not None: url_params['token'] = token if user_id is not None: url_params['user_id'] = user_id if one_click is not None: unsubscribe_route = "unsubscribe_oneclik" else: unsubscribe_route = "unsubscribe" url = url_join(self.base_url, f'digest/{self.test_digest.id}/{unsubscribe_route}?{url_encode(url_params)}') if method == 'GET': return self.opener.get(url, timeout=10, allow_redirects=True) if method == 'POST': return self.opener.post(url, timeout=10, allow_redirects=True) raise Exception(f'Invalid method {method}')