# Part of Odoo. See LICENSE file for full copyright and licensing details. from markupsafe import Markup from odoo.addons.mail.tests.common import mail_new_test_user, MailCommon from odoo.addons.mail.tools.discuss import Store from odoo.exceptions import UserError from odoo.tests.common import tagged, users, HttpCase from odoo.tools import is_html_empty, mute_logger, formataddr @tagged("mail_message", "post_install", "-at_install") class TestMessageValues(MailCommon): @classmethod def setUpClass(cls): super(TestMessageValues, cls).setUpClass() cls.alias_record = cls.env['mail.test.container'].with_context(cls._test_context).create({ 'name': 'Pigs', 'alias_name': 'pigs', 'alias_contact': 'followers', }) cls.Message = cls.env['mail.message'].with_user(cls.user_employee) @users('employee') def test_empty_message(self): """ Test that message is correctly considered as empty (see `_filter_empty()`). Message considered as empty if: - no body or empty body - AND no subtype or no subtype description - AND no tracking values - AND no attachment Check _update_content behavior when voiding messages (cleanup side records: stars, notifications). """ note_subtype = self.env.ref('mail.mt_note') _attach_1 = self.env['ir.attachment'].with_user(self.user_employee).create({ 'name': 'Attach1', 'datas': 'bWlncmF0aW9uIHRlc3Q=', 'res_id': 0, 'res_model': 'mail.compose.message', }) record = self.env['mail.test.track'].create({'name': 'EmptyTesting'}) self.flush_tracking() record.message_subscribe(partner_ids=self.partner_admin.ids, subtype_ids=note_subtype.ids) message = record.message_post( attachment_ids=_attach_1.ids, body='Test', message_type='comment', subtype_id=note_subtype.id, ) message.write({'starred_partner_ids': [(4, self.partner_admin.id)]}) # check content self.assertEqual(len(message.attachment_ids), 1) self.assertFalse(is_html_empty(message.body)) self.assertEqual(len(message.sudo().notification_ids), 1) self.assertEqual(message.notified_partner_ids, self.partner_admin) self.assertEqual(message.starred_partner_ids, self.partner_admin) self.assertFalse(message.sudo().tracking_value_ids) # Reset body case record._message_update_content(message, Markup('


'), attachment_ids=message.attachment_ids.ids) self.assertTrue(is_html_empty(message.body)) self.assertFalse(message.sudo()._filter_empty(), 'Still having attachments') # Subtype content note_subtype.sudo().write({'description': 'Very important discussions'}) record._message_update_content(message, '', []) self.assertFalse(message.attachment_ids) self.assertEqual(message.notified_partner_ids, self.partner_admin) self.assertEqual(message.starred_partner_ids, self.partner_admin) self.assertFalse(message.sudo()._filter_empty(), 'Subtype with description') # Completely void now note_subtype.sudo().write({'description': ''}) self.assertEqual(message.sudo()._filter_empty(), message) record._message_update_content(message, '', []) self.assertFalse(message.notified_partner_ids) self.assertFalse(message.starred_partner_ids) # test tracking values record.write({'user_id': self.user_admin.id}) self.flush_tracking() tracking_message = record.message_ids[0] self.assertFalse(tracking_message.attachment_ids) self.assertTrue(is_html_empty(tracking_message.body)) self.assertFalse(tracking_message.subtype_id.description) self.assertFalse(tracking_message.sudo()._filter_empty(), 'Has tracking values') with self.assertRaises(UserError, msg='Tracking values prevent from updating content'): record._message_update_content(tracking_message, '', []) @mute_logger('odoo.models.unlink') def test_mail_message_to_store_access(self): """ User that doesn't have access to a record should still be able to fetch the record_name inside message _to_store. """ company_2 = self.env['res.company'].create({'name': 'Second Test Company'}) record1 = self.env['mail.test.multi.company'].create({ 'name': 'Test1', 'company_id': company_2.id, }) message = record1.message_post(body='', partner_ids=[self.user_employee.partner_id.id]) # We need to flush and invalidate the ORM cache since the record_name # is already cached from the creation. Otherwise it will leak inside # message _to_store. self.env.flush_all() self.env.invalidate_all() res = Store(message.with_user(self.user_employee), for_current_user=True).get_result() self.assertEqual(res["mail.message"][0].get("record_name"), "Test1") record1.write({"name": "Test2"}) self.env.flush_all() self.env.invalidate_all() res = Store(message.with_user(self.user_employee), for_current_user=True).get_result() self.assertEqual(res["mail.message"][0].get('record_name'), 'Test2') # check model not inheriting from mail.thread -> should not crash record_nothread = self.env['mail.test.nothread'].create({'name': 'NoThread'}) message = self.env['mail.message'].create({ 'model': record_nothread._name, 'res_id': record_nothread.id, }) formatted = Store(message, for_current_user=True).get_result()["mail.message"][0] self.assertEqual(formatted['record_name'], record_nothread.name) def test_records_by_message(self): record1 = self.env["mail.test.simple"].create({"name": "Test1"}) record2 = self.env["mail.test.simple"].create({"name": "Test1"}) record3 = self.env["mail.test.nothread"].create({"name": "Test2"}) messages = self.env["mail.message"].create( [ { "model": record._name, "res_id": record.id, } for record in [record1, record2, record3] ] ) # methods called on batch of message records_by_model_name = messages._records_by_model_name() test_simple_records = records_by_model_name["mail.test.simple"] self.assertEqual(test_simple_records, record1 + record2) self.assertEqual(test_simple_records._prefetch_ids, tuple((record1 + record2).ids)) test_no_thread_records = records_by_model_name["mail.test.nothread"] self.assertEqual(test_no_thread_records, record3) self.assertEqual(test_no_thread_records._prefetch_ids, tuple(record3.ids)) record_by_message = messages._record_by_message() m0_records = record_by_message[messages[0]] self.assertEqual(m0_records, record1) self.assertEqual(m0_records._prefetch_ids, tuple((record1 + record2).ids)) m1_records = record_by_message[messages[1]] self.assertEqual(m1_records, record2) self.assertEqual(m1_records._prefetch_ids, tuple((record1 + record2).ids)) m2_records = record_by_message[messages[2]] self.assertEqual(m2_records, record3) self.assertEqual(m2_records._prefetch_ids, tuple(record3.ids)) # methods called on individual message from a batch: prefetch from batch is kept records_by_model_name = next(iter(messages))._records_by_model_name() test_simple_records = records_by_model_name["mail.test.simple"] self.assertEqual(test_simple_records, record1) self.assertEqual(test_simple_records._prefetch_ids, tuple((record1 + record2).ids)) record_by_message = next(iter(messages))._record_by_message() m0_records = record_by_message[messages[0]] self.assertEqual(m0_records, record1) self.assertEqual(m0_records._prefetch_ids, tuple((record1 + record2).ids)) def test_mail_message_values_body_base64_image(self): msg = self.env['mail.message'].with_user(self.user_employee).create({ 'body': 'taratata ', }) self.assertEqual(len(msg.attachment_ids), 1) self.assertEqual( msg.body, '

taratata image0 ' 'image0

'.format(attachment=msg.attachment_ids[0]) ) @mute_logger('odoo.models.unlink', 'odoo.addons.mail.models.models') @users('employee') def test_mail_message_values_fromto_long_name(self): """ Long headers may break in python if above 68 chars for certain DKIM verification stacks as folding is not done correctly (see ``_notify_get_reply_to_formatted_email`` docstring + commit linked to this test). """ # name would make it blow up: keep only email test_record = self.env['mail.test.container'].browse(self.alias_record.ids) test_record.write({ 'name': 'Super Long Name That People May Enter "Even with an internal quoting of stuff"' }) msg = self.env['mail.message'].create({ 'model': test_record._name, 'res_id': test_record.id }) reply_to_email = f"{test_record.alias_name}@{self.alias_domain}" self.assertEqual(msg.reply_to, reply_to_email, 'Reply-To: use only email when formataddr > 68 chars') # name + company_name would make it blow up: keep record_name in formatting self.company_admin.name = "Company name being about 33 chars" test_record.write({'name': 'Being more than 68 with company name'}) msg = self.env['mail.message'].create({ 'model': test_record._name, 'res_id': test_record.id }) self.assertEqual(msg.reply_to, formataddr((test_record.name, reply_to_email)), 'Reply-To: use recordname as name in format if recordname + company > 68 chars') # no record_name: keep company_name in formatting if ok test_record.write({'name': ''}) msg = self.env['mail.message'].create({ 'model': test_record._name, 'res_id': test_record.id }) self.assertEqual(msg.reply_to, formataddr((self.env.user.company_id.name, reply_to_email)), 'Reply-To: use company as name in format when no record name and still < 68 chars') # no record_name and company_name make it blow up: keep only email self.env.user.company_id.write({'name': 'Super Long Name That People May Enter "Even with an internal quoting of stuff"'}) msg = self.env['mail.message'].create({ 'model': test_record._name, 'res_id': test_record.id }) self.assertEqual(msg.reply_to, reply_to_email, 'Reply-To: use only email when formataddr > 68 chars') # whatever the record and company names, email is too long: keep only email test_record.write({ 'alias_name': 'Waaaay too long alias name that should make any reply-to blow the 68 characters limit', 'name': 'Short', }) self.env.user.company_id.write({'name': 'Comp'}) sanitized_alias_name = 'waaaay-too-long-alias-name-that-should-make-any-reply-to-blow-the-68-characters-limit' msg = self.env['mail.message'].create({ 'model': test_record._name, 'res_id': test_record.id }) self.assertEqual(msg.reply_to, f"{sanitized_alias_name}@{self.alias_domain}", 'Reply-To: even a long email is ok as only formataddr is problematic') @mute_logger('odoo.models.unlink') def test_mail_message_values_fromto_no_document_values(self): msg = self.Message.create({ 'reply_to': 'test.reply@example.com', 'email_from': 'test.from@example.com', }) self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one') self.assertEqual(msg.reply_to, 'test.reply@example.com') self.assertEqual(msg.email_from, 'test.from@example.com') @mute_logger('odoo.models.unlink') def test_mail_message_values_fromto_no_document(self): msg = self.Message.create({}) self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one') reply_to_name = self.env.user.company_id.name reply_to_email = '%s@%s' % (self.alias_catchall, self.alias_domain) self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) # no alias domain -> author self.env.company.alias_domain_id = False self.assertFalse(self.env.company.catchall_email) msg = self.Message.create({}) self.assertIn('-private', msg.message_id.split('@')[0], 'mail_message: message_id for a void message should be a "private" one') self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) @mute_logger('odoo.models.unlink') def test_mail_message_values_fromto_document_alias(self): msg = self.Message.create({ 'model': 'mail.test.container', 'res_id': self.alias_record.id }) self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0]) reply_to_name = '%s %s' % (self.env.user.company_id.name, self.alias_record.name) reply_to_email = '%s@%s' % (self.alias_record.alias_name, self.alias_domain) self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) # no alias domain, no company catchall -> author self.alias_record.alias_domain_id = False self.env.company.alias_domain_id = False self.assertFalse(self.env.company.catchall_email) msg = self.Message.create({ 'model': 'mail.test.container', 'res_id': self.alias_record.id }) self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0]) self.assertEqual(msg.reply_to, formataddr((self.user_employee.name, self.user_employee.email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) # alias wins over company, hence no catchall is not an issue self.alias_record.alias_domain_id = self.mail_alias_domain msg = self.Message.create({ 'model': 'mail.test.container', 'res_id': self.alias_record.id }) self.assertIn('-openerp-%d-mail.test' % self.alias_record.id, msg.message_id.split('@')[0]) reply_to_name = '%s %s' % (self.env.company.name, self.alias_record.name) reply_to_email = '%s@%s' % (self.alias_record.alias_name, self.alias_domain) self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) @mute_logger('odoo.models.unlink') def test_mail_message_values_fromto_document_no_alias(self): test_record = self.env['mail.test.simple'].create({'name': 'Test', 'email_from': 'ignasse@example.com'}) msg = self.Message.create({ 'model': 'mail.test.simple', 'res_id': test_record.id }) self.assertIn('-openerp-%d-mail.test.simple' % test_record.id, msg.message_id.split('@')[0]) reply_to_name = '%s %s' % (self.env.user.company_id.name, test_record.name) reply_to_email = '%s@%s' % (self.alias_catchall, self.alias_domain) self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) @mute_logger('odoo.models.unlink') def test_mail_message_values_fromto_document_manual_alias(self): test_record = self.env['mail.test.simple'].create({'name': 'Test', 'email_from': 'ignasse@example.com'}) alias = self.env['mail.alias'].create({ 'alias_name': 'MegaLias', 'alias_model_id': self.env['ir.model']._get('mail.test.simple').id, 'alias_parent_model_id': self.env['ir.model']._get('mail.test.simple').id, 'alias_parent_thread_id': test_record.id, }) msg = self.Message.create({ 'model': 'mail.test.simple', 'res_id': test_record.id }) self.assertIn('-openerp-%d-mail.test.simple' % test_record.id, msg.message_id.split('@')[0]) reply_to_name = '%s %s' % (self.env.user.company_id.name, test_record.name) reply_to_email = '%s@%s' % (alias.alias_name, self.alias_domain) self.assertEqual(msg.reply_to, formataddr((reply_to_name, reply_to_email))) self.assertEqual(msg.email_from, formataddr((self.user_employee.name, self.user_employee.email))) def test_mail_message_values_fromto_reply_to_force_new(self): msg = self.Message.create({ 'model': 'mail.test.container', 'res_id': self.alias_record.id, 'reply_to_force_new': True, }) self.assertIn('reply_to', msg.message_id.split('@')[0]) self.assertNotIn('mail.test.container', msg.message_id.split('@')[0]) self.assertNotIn('-%d-' % self.alias_record.id, msg.message_id.split('@')[0]) def test_mail_message_values_misc(self): """ Test various values on mail.message, notably default values """ msg = self.env['mail.message'].create({'model': self.alias_record._name, 'res_id': self.alias_record.id}) self.assertEqual(msg.message_type, 'comment', 'Message should be comments by default') @tagged("mail_message") class TestMessageLinks(MailCommon, HttpCase): def test_message_link_by_employee(self): record = self.env['mail.test.simple'].create({'name': 'Test1'}) thread_message = record.message_post(body='Thread Message', message_type='comment') deleted_message = record.message_post(body='', message_type='comment') self.authenticate(self.user_employee.login, self.user_employee.login) with self.subTest(thread_message=thread_message): expected_url = self.base_url() + f'/odoo/{thread_message.model}/{thread_message.res_id}?highlight_message_id={thread_message.id}' res = self.url_open(f'/mail/message/{thread_message.id}') self.assertEqual(res.url, expected_url) self.assertEqual(res.url, expected_url) with self.subTest(deleted_message=deleted_message): res = self.url_open(f'/mail/message/{deleted_message.id}') @tagged("mail_message", "mail_store", "post_install", "-at_install") class TestMessageStore(MailCommon, HttpCase): def test_store_data_use_display_name(self): test_record = self.env['mail.test.simple.unnamed'].create({'description': 'Some description'}) user_invalid = mail_new_test_user(self.env, login='invalid', groups='base.group_portal', name='Invalid User', email='invalid email', notification_type='email') test_record.message_subscribe(partner_ids=user_invalid.partner_id.ids) self.authenticate(self.user_employee.login, self.user_employee.password) msg = test_record.message_post(body='Some body', author_id=self.partner_employee.id) # simulate failure self.env['mail.notification'].create({ 'author_id': msg.author_id.id, 'mail_message_id': msg.id, 'res_partner_id': user_invalid.partner_id.id, 'notification_type': 'email', 'notification_status': 'exception', 'failure_type': 'mail_email_invalid', }) res = self.make_jsonrpc_request("/mail/data", {"failures": True}) self.assertEqual([t["name"] for t in res["mail.thread"]], ['Some description'])