# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import psycopg2 import pytz import re import smtplib from email import message_from_string from datetime import datetime, timedelta from freezegun import freeze_time from markupsafe import Markup from OpenSSL.SSL import Error as SSLError from socket import gaierror, timeout from unittest.mock import call, patch, PropertyMock from odoo import api, Command, fields, SUPERUSER_ID from odoo.addons.base.models.ir_mail_server import MailDeliveryException from odoo.addons.mail.tests.common import MailCommon from odoo.exceptions import AccessError from odoo.tests import common, tagged, users from odoo.tools import formataddr, mute_logger @tagged('mail_mail') class TestMailMail(MailCommon): @classmethod def setUpClass(cls): super(TestMailMail, cls).setUpClass() cls.test_record = cls.env['mail.test.gateway'].with_context(cls._test_context).create({ 'name': 'Test', 'email_from': 'ignasse@example.com', }).with_context({}) cls.test_message = cls.test_record.message_post(body=Markup('

Message

'), subject='Subject') cls.test_mail = cls.env['mail.mail'].create([{ 'body': Markup('

Body

'), 'email_from': False, 'email_to': 'test@example.com', 'is_notification': True, 'subject': 'Subject', }]) cls.test_notification = cls.env['mail.notification'].create({ 'is_read': False, 'mail_mail_id': cls.test_mail.id, 'mail_message_id': cls.test_message.id, 'notification_type': 'email', 'res_partner_id': cls.partner_employee.id, # not really used for matching except multi-recipients }) cls.emails_falsy = [False, '', ' '] cls.emails_invalid = ['buggy', 'buggy, wrong'] cls.emails_invalid_ascii = ['raoul@example¢¡.com'] cls.emails_valid = ['raoul¢¡@example.com', 'raoul@example.com'] def _reset_data(self): self._init_mail_mock() self.test_mail.write({'failure_reason': False, 'failure_type': False, 'state': 'outgoing'}) self.test_notification.write({'failure_reason': False, 'failure_type': False, 'notification_status': 'ready'}) @users('admin') def test_mail_mail_attachment_access(self): mail = self.env['mail.mail'].create({ 'body_html': 'Test', 'email_to': 'test@example.com', 'partner_ids': [(4, self.user_employee.partner_id.id)], 'attachment_ids': [ (0, 0, {'name': 'file 1', 'datas': 'c2VjcmV0'}), (0, 0, {'name': 'file 2', 'datas': 'c2VjcmV0'}), (0, 0, {'name': 'file 3', 'datas': 'c2VjcmV0'}), (0, 0, {'name': 'file 4', 'datas': 'c2VjcmV0'}), ], }) def _patched_check(self, *args, **kwargs): if self.env.is_superuser(): return if any(attachment.name in ('file 2', 'file 4') for attachment in self): raise AccessError('No access') mail.invalidate_recordset() new_attachment = self.env['ir.attachment'].create({ 'name': 'new file', 'datas': 'c2VjcmV0', }) with patch.object(type(self.env['ir.attachment']), 'check', _patched_check): # Sanity check self.assertEqual(mail.restricted_attachment_count, 2) self.assertEqual(len(mail.unrestricted_attachment_ids), 2) self.assertEqual(mail.unrestricted_attachment_ids.mapped('name'), ['file 1', 'file 3']) # Add a new attachment mail.write({ 'unrestricted_attachment_ids': [Command.link(new_attachment.id)], }) self.assertEqual(mail.restricted_attachment_count, 2) self.assertEqual(len(mail.unrestricted_attachment_ids), 3) self.assertEqual(mail.unrestricted_attachment_ids.mapped('name'), ['file 1', 'file 3', 'new file']) self.assertEqual(len(mail.attachment_ids), 5) # Remove an attachment mail.write({ 'unrestricted_attachment_ids': [Command.unlink(new_attachment.id)], }) self.assertEqual(mail.restricted_attachment_count, 2) self.assertEqual(len(mail.unrestricted_attachment_ids), 2) self.assertEqual(mail.unrestricted_attachment_ids.mapped('name'), ['file 1', 'file 3']) self.assertEqual(len(mail.attachment_ids), 4) # Reset command mail.invalidate_recordset() mail.write({'unrestricted_attachment_ids': [Command.clear()]}) self.assertEqual(len(mail.unrestricted_attachment_ids), 0) self.assertEqual(len(mail.attachment_ids), 2) # Read in SUDO mail.invalidate_recordset() self.assertEqual(mail.sudo().restricted_attachment_count, 2) self.assertEqual(len(mail.sudo().unrestricted_attachment_ids), 0) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_headers(self): """ Test headers management when set on outgoing mail. """ # mail without thread-enabled record base_values = { 'body_html': '

Test

', 'email_to': 'test@example.com', 'headers': {'foo': 'bar'}, } for headers, expected in [ ({'foo': 'bar'}, {'foo': 'bar'}), ("{'foo': 'bar'}", {'foo': 'bar'}), ("{'foo': 'bar', 'baz': '3+2'}", {'foo': 'bar', 'baz': '3+2'}), (['not_a_dict'], {}), ('alsonotadict', {}), ("['not_a_dict']", {}), ("{'invaliddict'}", {}), ]: with self.subTest(headers=headers, expected=expected): mail = self.env['mail.mail'].create([ dict(base_values, headers=headers) ]) with self.mock_mail_gateway(): mail.send() for key, value in expected.items(): self.assertIn(key, self._mails[0]['headers']) self.assertEqual(self._mails[0]['headers'][key], value) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_recipients(self): """ Partner_ids is a field used from mail_message, but not from mail_mail. """ mail = self.env['mail.mail'].sudo().create({ 'body_html': '

Test

', 'email_to': 'test@example.com', 'partner_ids': [(4, self.user_employee.partner_id.id)] }) with self.mock_mail_gateway(): mail.send() self.assertSentEmail(mail.env.user.partner_id, ['test@example.com']) self.assertEqual(len(self._mails), 1) mail = self.env['mail.mail'].sudo().create({ 'body_html': '

Test

', 'email_to': 'test@example.com', 'recipient_ids': [(4, self.user_employee.partner_id.id)], }) with self.mock_mail_gateway(): mail.send() self.assertSentEmail(mail.env.user.partner_id, ['test@example.com']) self.assertSentEmail(mail.env.user.partner_id, [self.user_employee.email_formatted]) self.assertEqual(len(self._mails), 2) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_recipients_cc(self): """ Partner_ids is a field used from mail_message, but not from mail_mail. """ mail = self.env['mail.mail'].sudo().create({ 'body_html': '

Test

', 'email_cc': 'test.cc.1@example.com, "Herbert" ', 'email_to': 'test.rec.1@example.com, "Raoul" ', 'recipient_ids': [(4, self.user_employee.partner_id.id)], }) with self.mock_mail_gateway(): mail.send() # note that formatting is lost for cc self.assertSentEmail(mail.env.user.partner_id, ['test.rec.1@example.com', '"Raoul" '], email_cc=['test.cc.1@example.com', '"Herbert" ']) # don't put CCs as copy of each outgoing email, only the first one (and never # with partner based recipients as those may receive specific links) self.assertSentEmail(mail.env.user.partner_id, [self.user_employee.email_formatted], email_cc=[]) self.assertEqual(len(self._mails), 2) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_recipients_formatting(self): """ Check support of email / formatted email """ mail = self.env['mail.mail'].sudo().create({ 'author_id': False, 'body_html': '

Test

', 'email_cc': 'test.cc.1@example.com, "Herbert" ', 'email_from': '"Ignasse" ', 'email_to': 'test.rec.1@example.com, "Raoul" ', }) with self.mock_mail_gateway(): mail.send() # note that formatting is lost for cc self.assertSentEmail('"Ignasse" ', ['test.rec.1@example.com', '"Raoul" '], email_cc=['test.cc.1@example.com', '"Herbert" ']) self.assertEqual(len(self._mails), 1) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_return_path(self): # mail without thread-enabled record base_values = { 'body_html': '

Test

', 'email_to': 'test@example.com', } mail = self.env['mail.mail'].create(base_values) with self.mock_mail_gateway(): mail.send() self.assertEqual(self._mails[0]['headers']['Return-Path'], '%s@%s' % (self.alias_bounce, self.alias_domain)) # mail on thread-enabled record mail = self.env['mail.mail'].create(dict(base_values, **{ 'model': self.test_record._name, 'res_id': self.test_record.id, })) with self.mock_mail_gateway(): mail.send() self.assertEqual(self._mails[0]['headers']['Return-Path'], '%s@%s' % (self.alias_bounce, self.alias_domain)) @mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.tests') def test_mail_mail_schedule(self): """Test that a mail scheduled in the past/future are sent or not""" now = datetime(2022, 6, 28, 14, 0, 0) scheduled_datetimes = [ # falsy values False, '', 'This is not a date format', # datetimes (UTC/GMT +10 hours for Australia/Brisbane) now, pytz.timezone('Australia/Brisbane').localize(now), # string fields.Datetime.to_string(now - timedelta(days=1)), fields.Datetime.to_string(now + timedelta(days=1)), (now + timedelta(days=1)).strftime("%H:%M:%S %d-%m-%Y"), # tz: is actually 1 hour before now in UTC (now + timedelta(hours=3)).strftime("%H:%M:%S %d-%m-%Y") + " +0400", # tz: is actually 1 hour after now in UTC (now + timedelta(hours=-3)).strftime("%H:%M:%S %d-%m-%Y") + " -0400", ] expected_datetimes = [ False, False, False, now, now - pytz.timezone('Australia/Brisbane').utcoffset(now), now - timedelta(days=1), now + timedelta(days=1), now + timedelta(days=1), now + timedelta(hours=-1), now + timedelta(hours=1), ] expected_states = [ # falsy values = send now 'sent', 'sent', 'sent', 'sent', 'sent', 'sent', 'outgoing', 'outgoing', 'sent', 'outgoing' ] mails = self.env['mail.mail'].create([ {'body_html': '

Test

', 'email_to': 'test@example.com', 'scheduled_date': scheduled_datetime, } for scheduled_datetime in scheduled_datetimes ]) for mail, expected_datetime, scheduled_datetime in zip(mails, expected_datetimes, scheduled_datetimes): self.assertEqual(mail.scheduled_date, expected_datetime, 'Scheduled date: %s should be stored as %s, received %s' % (scheduled_datetime, expected_datetime, mail.scheduled_date)) self.assertEqual(mail.state, 'outgoing') with freeze_time(now): self.env['mail.mail'].process_email_queue() for mail, expected_state in zip(mails, expected_states): self.assertEqual(mail.state, expected_state) @mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.tests') def test_mail_mail_send_configuration(self): """ Test configuration and control of email queue """ self.env['mail.mail'].search([]).unlink() # cleanup queue # test 'mail.mail.queue.batch.size': cron fetch size for queue_batch_size, exp_send_count in [ (3, 3), (0, 10), # maximum available (False, 10), # maximum available ]: with self.subTest(queue_batch_size=queue_batch_size), \ self.mock_mail_gateway(): self.env['ir.config_parameter'].sudo().set_param('mail.mail.queue.batch.size', queue_batch_size) mails = self.env['mail.mail'].create([ { 'auto_delete': False, 'body_html': f'Batch Email {idx}', 'email_from': 'test.from@mycompany.example.com', 'email_to': 'test.outgoing@test.example.com', 'state': 'outgoing', } for idx in range(10) ]) self.env['mail.mail'].process_email_queue() self.assertEqual(len(self._mails), exp_send_count) mails.write({'state': 'sent'}) # avoid conflicts between batch # test 'mail.session.batch.size': batch send size self.env['ir.config_parameter'].sudo().set_param('mail.mail.queue.batch.size', False) for session_batch_size, exp_call_count in [ (3, 4), # 10 mails -> 4 iterations of 3 (0, 1), (False, 1), ]: with self.subTest(session_batch_size=session_batch_size), \ self.mock_mail_gateway(): self.env['ir.config_parameter'].sudo().set_param('mail.session.batch.size', session_batch_size) mails = self.env['mail.mail'].create([ { 'auto_delete': False, 'body_html': f'Batch Email {idx}', 'email_from': 'test.from@mycompany.example.com', 'email_to': 'test.outgoing@test.example.com', 'state': 'outgoing', } for idx in range(10) ]) self.env['mail.mail'].process_email_queue() self.assertEqual(self.mail_mail_private_send_mocked.call_count, exp_call_count) mails.write({'state': 'sent'}) # avoid conflicts between batch @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_send_exceptions_origin(self): """ Test various use case with exceptions and errors and see how they are managed and stored at mail and notification level. """ mail, notification = self.test_mail, self.test_notification # MailServer.build_email(): invalid from (missing) for default_from in [False, '']: self.mail_alias_domain.default_from = default_from self._reset_data() with self.mock_mail_gateway(), mute_logger('odoo.addons.mail.models.mail_mail'): mail.send(raise_exception=False) self.assertFalse(self._mails[0]['email_from']) self.assertEqual( mail.failure_reason, 'You must either provide a sender address explicitly or configure using the combination of `mail.catchall.domain` and `mail.default.from` ICPs, in the server configuration file or with the --email-from startup parameter.') self.assertEqual(mail.failure_type, 'mail_from_missing') self.assertEqual(mail.state, 'exception') self.assertEqual( notification.failure_reason, 'You must either provide a sender address explicitly or configure using the combination of `mail.catchall.domain` and `mail.default.from` ICPs, in the server configuration file or with the --email-from startup parameter.') self.assertEqual(notification.failure_type, 'mail_from_missing') self.assertEqual(notification.notification_status, 'exception') # MailServer.send_email(): _prepare_email_message: unexpected ASCII / Malformed 'Return-Path' or 'From' address # Force bounce alias to void, will force usage of email_from self.mail_alias_domain.bounce_alias = False self.env.company.invalidate_recordset(fnames={'bounce_email', 'bounce_formatted'}) for email_from in ['strange@example¢¡.com', 'robert']: self._reset_data() mail.write({'email_from': email_from}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(self._mails[0]['email_from'], email_from) self.assertEqual(mail.failure_reason, f"Malformed 'Return-Path' or 'From' address: {email_from} - It should contain one valid plain ASCII email") self.assertEqual(mail.failure_type, 'mail_from_invalid') self.assertEqual(mail.state, 'exception') self.assertEqual(notification.failure_reason, f"Malformed 'Return-Path' or 'From' address: {email_from} - It should contain one valid plain ASCII email") self.assertEqual(notification.failure_type, 'mail_from_invalid') self.assertEqual(notification.notification_status, 'exception') @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_send_exceptions_recipients_emails(self): """ Test various use case with exceptions and errors and see how they are managed and stored at mail and notification level. """ mail, notification = self.test_mail, self.test_notification # MailServer.send_email(): _prepare_email_message: missing To for email_to in self.emails_falsy: self._reset_data() mail.write({'email_to': email_to}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.') self.assertFalse(mail.failure_type, 'Mail: missing email_to: no failure type, should be updated') self.assertEqual(mail.state, 'exception') if email_to == ' ': self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated') self.assertEqual(notification.failure_type, 'mail_email_missing') self.assertEqual(notification.notification_status, 'exception') else: self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated') self.assertEqual(notification.failure_type, False, 'Mail: missing email_to: notification is wrongly set as sent') self.assertEqual(notification.notification_status, 'sent', 'Mail: missing email_to: notification is wrongly set as sent') # MailServer.send_email(): _prepare_email_message: invalid To for email_to, failure_type in zip( self.emails_invalid, ['mail_email_missing', 'mail_email_missing'] ): self._reset_data() mail.write({'email_to': email_to}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.') self.assertFalse(mail.failure_type, 'Mail: invalid email_to: no failure type, should be updated') self.assertEqual(mail.state, 'exception') self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated') self.assertEqual(notification.failure_type, failure_type, 'Mail: invalid email_to: missing instead of invalid') self.assertEqual(notification.notification_status, 'exception') # MailServer.send_email(): _prepare_email_message: invalid To (ascii) for email_to in self.emails_invalid_ascii: self._reset_data() mail.write({'email_to': email_to}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.') self.assertFalse(mail.failure_type, 'Mail: invalid (ascii) recipient partner: no failure type, should be updated') self.assertEqual(mail.state, 'exception') self.assertEqual(notification.failure_type, 'mail_email_invalid') self.assertEqual(notification.notification_status, 'exception') # MailServer.send_email(): _prepare_email_message: ok To (ascii or just ok) for email_to in self.emails_valid: self._reset_data() mail.write({'email_to': email_to}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertFalse(mail.failure_reason) self.assertFalse(mail.failure_type) self.assertEqual(mail.state, 'sent') self.assertFalse(notification.failure_reason) self.assertFalse(notification.failure_type) self.assertEqual(notification.notification_status, 'sent') @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_send_exceptions_recipients_partners(self): """ Test various use case with exceptions and errors and see how they are managed and stored at mail and notification level. """ mail, notification = self.test_mail, self.test_notification mail.write({'email_from': 'test.user@test.example.com', 'email_to': False}) partners_falsy = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_falsy ]) partners_invalid = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_invalid ]) partners_invalid_ascii = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_invalid_ascii ]) partners_valid = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_valid ]) # void values for partner in partners_falsy: self._reset_data() mail.write({'recipient_ids': [(5, 0), (4, partner.id)]}) notification.write({'res_partner_id': partner.id}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.') self.assertFalse(mail.failure_type, 'Mail: void recipient partner: no failure type, should be updated') self.assertEqual(mail.state, 'exception') self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated') self.assertEqual(notification.failure_type, 'mail_email_invalid', 'Mail: void recipient partner: should be missing, not invalid') self.assertEqual(notification.notification_status, 'exception') # wrong values for partner in partners_invalid: self._reset_data() mail.write({'recipient_ids': [(5, 0), (4, partner.id)]}) notification.write({'res_partner_id': partner.id}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.') self.assertFalse(mail.failure_type, 'Mail: invalid recipient partner: no failure type, should be updated') self.assertEqual(mail.state, 'exception') self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated') self.assertEqual(notification.failure_type, 'mail_email_invalid') self.assertEqual(notification.notification_status, 'exception') # ascii ko for partner in partners_invalid_ascii: self._reset_data() mail.write({'recipient_ids': [(5, 0), (4, partner.id)]}) notification.write({'res_partner_id': partner.id}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.') self.assertFalse(mail.failure_type, 'Mail: invalid (ascii) recipient partner: no failure type, should be updated') self.assertEqual(mail.state, 'exception') self.assertEqual(notification.failure_type, 'mail_email_invalid') self.assertEqual(notification.notification_status, 'exception') # ascii ok or just ok for partner in partners_valid: self._reset_data() mail.write({'recipient_ids': [(5, 0), (4, partner.id)]}) notification.write({'res_partner_id': partner.id}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertFalse(mail.failure_reason) self.assertFalse(mail.failure_type) self.assertEqual(mail.state, 'sent') self.assertFalse(notification.failure_reason) self.assertFalse(notification.failure_type) self.assertEqual(notification.notification_status, 'sent') @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_send_exceptions_recipients_partners_mixed(self): """ Test various use case with exceptions and errors and see how they are managed and stored at mail and notification level. """ mail, notification = self.test_mail, self.test_notification mail.write({'email_to': 'test@example.com'}) partners_falsy = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_falsy ]) partners_invalid = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_invalid ]) partners_valid = self.env['res.partner'].create([ {'name': 'Name %s' % email, 'email': email} for email in self.emails_valid ]) # valid to, missing email for recipient or wrong email for recipient for partner in partners_falsy + partners_invalid: self._reset_data() mail.write({'recipient_ids': [(5, 0), (4, partner.id)]}) notification.write({'res_partner_id': partner.id}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertFalse(mail.failure_reason, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertFalse(mail.failure_type, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertEqual(mail.state, 'sent', 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertFalse(notification.failure_reason, 'Mail: void email considered as invalid') self.assertEqual(notification.failure_type, 'mail_email_invalid', 'Mail: void email considered as invalid') self.assertEqual(notification.notification_status, 'exception') # update to have valid partner and invalid partner mail.write({'recipient_ids': [(5, 0), (4, partners_valid[1].id), (4, partners_falsy[0].id)]}) notification.write({'res_partner_id': partners_valid[1].id}) notification2 = notification.create({ 'is_read': False, 'mail_mail_id': mail.id, 'mail_message_id': self.test_message.id, 'notification_type': 'email', 'res_partner_id': partners_falsy[0].id, }) # missing to / invalid to for email_to in self.emails_falsy + self.emails_invalid: self._reset_data() notification2.write({'failure_reason': False, 'failure_type': False, 'notification_status': 'ready'}) mail.write({'email_to': email_to}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertFalse(mail.failure_reason, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertFalse(mail.failure_type, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertEqual(mail.state, 'sent', 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertFalse(notification.failure_reason) self.assertFalse(notification.failure_type) self.assertEqual(notification.notification_status, 'sent') self.assertFalse(notification2.failure_reason) self.assertEqual(notification2.failure_type, 'mail_email_invalid') self.assertEqual(notification2.notification_status, 'exception') # buggy to (ascii) for email_to in self.emails_invalid_ascii: self._reset_data() notification2.write({'failure_reason': False, 'failure_type': False, 'notification_status': 'ready'}) mail.write({'email_to': email_to}) with self.mock_mail_gateway(): mail.send(raise_exception=False) self.assertFalse(mail.failure_type, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam') self.assertEqual(mail.state, 'sent') self.assertFalse(notification.failure_type) self.assertEqual(notification.notification_status, 'sent') self.assertEqual(notification2.failure_type, 'mail_email_invalid') self.assertEqual(notification2.notification_status, 'exception') @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_send_exceptions_raise_management(self): """ Test various use case with exceptions and errors and see how they are managed and stored at mail and notification level. """ mail, notification = self.test_mail, self.test_notification mail.write({'email_from': 'test.user@test.example.com', 'email_to': 'test@example.com'}) # SMTP connecting issues with self.mock_mail_gateway(): _connect_current = self.connect_mocked.side_effect # classic errors that may be raised during sending, just to test their current support for error, msg in [ (smtplib.SMTPServerDisconnected('SMTPServerDisconnected'), 'SMTPServerDisconnected'), (smtplib.SMTPResponseException('code', 'SMTPResponseException'), 'code\nSMTPResponseException'), (smtplib.SMTPNotSupportedError('SMTPNotSupportedError'), 'SMTPNotSupportedError'), (smtplib.SMTPException('SMTPException'), 'SMTPException'), (SSLError('SSLError'), 'SSLError'), (gaierror('gaierror'), 'gaierror'), (timeout('timeout'), 'timeout')]: def _connect(*args, **kwargs): raise error self.connect_mocked.side_effect = _connect mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, msg) self.assertFalse(mail.failure_type) self.assertEqual(mail.state, 'exception') self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated') self.assertEqual(notification.failure_type, 'mail_smtp') self.assertEqual(notification.notification_status, 'exception') self._reset_data() self.connect_mocked.side_effect = _connect_current # SMTP sending issues with self.mock_mail_gateway(): _send_current = self.send_email_mocked.side_effect self._reset_data() mail.write({'email_to': 'test@example.com'}) # should always raise for those errors, even with raise_exception=False for error, error_class in [ (smtplib.SMTPServerDisconnected("Some exception"), smtplib.SMTPServerDisconnected), (MemoryError("Some exception"), MemoryError)]: def _send_email(*args, **kwargs): raise error self.send_email_mocked.side_effect = _send_email with self.assertRaises(error_class): mail.send(raise_exception=False) self.assertFalse(mail.failure_reason, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback') self.assertFalse(mail.failure_type, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback') self.assertEqual(mail.state, 'outgoing', 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback') self.assertFalse(notification.failure_reason, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback') self.assertFalse(notification.failure_type, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback') self.assertEqual(notification.notification_status, 'ready', 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback') # MailDeliveryException: should be catched; other issues are sub-catched under # a MailDeliveryException and are catched for error, msg in [ (MailDeliveryException("Some exception"), 'Some exception'), (ValueError("Unexpected issue"), 'Unexpected issue')]: def _send_email(*args, **kwargs): raise error self.send_email_mocked.side_effect = _send_email self._reset_data() mail.send(raise_exception=False) self.assertEqual(mail.failure_reason, msg) self.assertEqual(mail.failure_type, 'unknown', 'Mail: unlogged failure type to fix') self.assertEqual(mail.state, 'exception') self.assertEqual(notification.failure_reason, msg) self.assertEqual(notification.failure_type, 'unknown', 'Mail: generic failure type') self.assertEqual(notification.notification_status, 'exception') self.send_email_mocked.side_effect = _send_current def test_mail_mail_values_misc(self): """ Test various values on mail.mail, notably default values """ msg = self.env['mail.mail'].create({}) self.assertEqual(msg.message_type, 'email_outgoing', 'Mails should have outgoing email type by default') @tagged('mail_mail', 'mail_server') class TestMailMailServer(MailCommon): @classmethod def setUpClass(cls): super().setUpClass() cls.mail_server_domain_2 = cls.env['ir.mail_server'].create({ 'from_filter': 'test_2.com', 'name': 'Server 2', 'smtp_host': 'test_2.com', }) cls.test_record = cls.env['mail.test.gateway'].with_context(cls._test_context).create({ 'name': 'Test', 'email_from': 'ignasse@example.com', }).with_context({}) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_send_server(self): """Test that the mails are send in batch. Batch are defined by the mail server and the email from field. """ self.assertEqual( self.env['ir.mail_server']._get_default_from_address(), f'{self.default_from}@{self.alias_domain}' ) mail_values = { 'body_html': '

Test

', 'email_to': 'user@example.com', } # Should be encapsulated in the notification email mails = self.env['mail.mail'].create([{ **mail_values, 'email_from': 'test@unknown_domain.com', } for _ in range(5)]) | self.env['mail.mail'].create([{ **mail_values, 'email_from': 'test_2@unknown_domain.com', } for _ in range(5)]) # Should use the test_2 mail server # Once with "user_1@test_2.com" as login # Once with "user_2@test_2.com" as login mails += self.env['mail.mail'].create([{ **mail_values, 'email_from': 'user_1@test_2.com', } for _ in range(5)]) + self.env['mail.mail'].create([{ **mail_values, 'email_from': 'user_2@test_2.com', } for _ in range(5)]) # Mail server is forced mails += self.env['mail.mail'].create([{ **mail_values, 'email_from': 'user_1@test_2.com', 'mail_server_id': self.mail_server_domain.id, } for _ in range(5)]) with self.mock_smtplib_connection(): mails.send() self.assertEqual(self.find_mail_server_mocked.call_count, 4, 'Must be called only once per "mail from" when the mail server is not forced') self.assertEqual(len(self.emails), 25) # Check call to the connect method to ensure that we authenticate # to the right mail server with the right login self.assertEqual(self.connect_mocked.call_count, 4, 'Must be called once per batch which share the same mail server and the same smtp from') self.connect_mocked.assert_has_calls( calls=[ call(smtp_from=f'{self.default_from}@{self.alias_domain}', mail_server_id=self.mail_server_notification.id), call(smtp_from='user_1@test_2.com', mail_server_id=self.mail_server_domain_2.id), call(smtp_from='user_2@test_2.com', mail_server_id=self.mail_server_domain_2.id), call(smtp_from='user_1@test_2.com', mail_server_id=self.mail_server_domain.id), ], any_order=True, ) self.assertSMTPEmailsSent(message_from=f'"test" <{self.default_from}@{self.alias_domain}>', emails_count=5, from_filter=self.mail_server_notification.from_filter) self.assertSMTPEmailsSent(message_from=f'"test_2" <{self.default_from}@{self.alias_domain}>', emails_count=5, from_filter=self.mail_server_notification.from_filter) self.assertSMTPEmailsSent(message_from='user_1@test_2.com', emails_count=5, mail_server=self.mail_server_domain_2) self.assertSMTPEmailsSent(message_from='user_2@test_2.com', emails_count=5, mail_server=self.mail_server_domain_2) self.assertSMTPEmailsSent(message_from='user_1@test_2.com', emails_count=5, mail_server=self.mail_server_domain) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_values_email_formatted(self): """ Test outgoing email values, with formatting """ customer = self.env['res.partner'].create({ 'name': 'Tony Customer', 'email': '"Formatted Emails" ', }) mail = self.env['mail.mail'].create({ 'body_html': '

Test

', 'email_cc': '"Ignasse, le Poilu" ', 'email_to': '"Raoul, le Grand" , "Micheline, l\'immense" ', 'recipient_ids': [(4, self.user_employee.partner_id.id), (4, customer.id)] }) with self.mock_mail_gateway(): mail.send() self.assertEqual(len(self._mails), 3, 'Mail: sent 3 emails: 1 for email_to, 1 / recipient') self.assertEqual( sorted(sorted(_mail['email_to']) for _mail in self._mails), sorted([sorted(['"Raoul, le Grand" ', '"Micheline, l\'immense" ']), [formataddr((self.user_employee.name, self.user_employee.email_normalized))], [formataddr(("Tony Customer", 'tony.customer@test.example.com'))] ]), 'Mail: formatting issues should have been removed as much as possible' ) # CC are added to first email self.assertEqual( [_mail['email_cc'] for _mail in self._mails], [['"Ignasse, le Poilu" '], [], []], 'Mail: currently always removing formatting in email_cc' ) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_values_email_multi(self): """ Test outgoing email values, with email field holding multi emails """ # Multi customer = self.env['res.partner'].create({ 'name': 'Tony Customer', 'email': 'tony.customer@test.example.com, norbert.customer@test.example.com', }) mail = self.env['mail.mail'].create({ 'body_html': '

Test

', 'email_cc': 'test.cc.1@test.example.com, test.cc.2@test.example.com', 'email_to': 'test.email.1@test.example.com, test.email.2@test.example.com', 'recipient_ids': [(4, self.user_employee.partner_id.id), (4, customer.id)] }) with self.mock_mail_gateway(): mail.send() self.assertEqual(len(self._mails), 3, 'Mail: sent 3 emails: 1 for email_to, 1 / recipient') self.assertEqual( sorted(sorted(_mail['email_to']) for _mail in self._mails), sorted([sorted(['test.email.1@test.example.com', 'test.email.2@test.example.com']), [formataddr((self.user_employee.name, self.user_employee.email_normalized))], sorted([formataddr(("Tony Customer", 'tony.customer@test.example.com')), formataddr(("Tony Customer", 'norbert.customer@test.example.com'))]), ]), 'Mail: formatting issues should have been removed as much as possible (multi emails in a single address are managed ' 'like separate emails when sending with recipient_ids' ) # CC are added to first email self.assertEqual( [_mail['email_cc'] for _mail in self._mails], [['test.cc.1@test.example.com', 'test.cc.2@test.example.com'], [], []], ) # Multi + formatting customer = self.env['res.partner'].create({ 'name': 'Tony Customer', 'email': 'tony.customer@test.example.com, "Norbert Customer" ', }) mail = self.env['mail.mail'].create({ 'body_html': '

Test

', 'email_cc': 'test.cc.1@test.example.com, test.cc.2@test.example.com', 'email_to': 'test.email.1@test.example.com, test.email.2@test.example.com', 'recipient_ids': [(4, self.user_employee.partner_id.id), (4, customer.id)] }) with self.mock_mail_gateway(): mail.send() self.assertEqual(len(self._mails), 3, 'Mail: sent 3 emails: 1 for email_to, 1 / recipient') self.assertEqual( sorted(sorted(_mail['email_to']) for _mail in self._mails), sorted([sorted(['test.email.1@test.example.com', 'test.email.2@test.example.com']), [formataddr((self.user_employee.name, self.user_employee.email_normalized))], sorted([formataddr(("Tony Customer", 'tony.customer@test.example.com')), formataddr(("Tony Customer", 'norbert.customer@test.example.com'))]), ]), 'Mail: formatting issues should have been removed as much as possible (multi emails in a single address are managed ' 'like separate emails when sending with recipient_ids (and partner name is always used as name part)' ) # CC are added to first email self.assertEqual( [_mail['email_cc'] for _mail in self._mails], [['test.cc.1@test.example.com', 'test.cc.2@test.example.com'], [], []], ) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_values_headers(self): """ Test headers content, notably X-Odoo-Message-Id added to keep context when going through exotic mail providers that change our message IDs. """ mail = self.env['mail.mail'].create({ 'body_html': '

Test

', 'email_to': 'test.😊@example.com', }) message_id = mail.message_id with self.mock_mail_gateway(): mail.send() self.assertEqual(len(self._mails), 1) self.assertDictEqual( self._mails[0]['headers'], { 'Return-Path': f'{self.alias_bounce}@{self.alias_domain}', 'X-Odoo-Message-Id': message_id, } ) @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_values_email_unicode(self): """ Unicode should be fine. """ mail = self.env['mail.mail'].create({ 'body_html': '

Test

', 'email_cc': 'test.😊.cc@example.com', 'email_to': 'test.😊@example.com', }) with self.mock_mail_gateway(): mail.send() self.assertEqual(len(self._mails), 1) self.assertEqual(self._mails[0]['email_cc'], ['test.😊.cc@example.com']) self.assertEqual(self._mails[0]['email_to'], ['test.😊@example.com']) @users('admin') @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_mail_values_email_uppercase(self): """ Test uppercase support when comparing emails, notably due to 'send_validated_to' introduction that checks emails before sending them. """ customer = self.env['res.partner'].create({ 'name': 'Uppercase Partner', 'email': 'Uppercase.Partner.youpie@example.gov.uni', }) for recipient_values, exp_recipients in zip( [ {'email_to': 'Uppercase.Customer.to@example.gov.uni'}, {'email_to': '"Formatted Customer" ', 'email_cc': '"UpCc" '}, {'recipient_ids': [(4, customer.id)], 'email_cc': '"UpCc" '}, ], [ [(['uppercase.customer.to@example.gov.uni'], [])], [(['"Formatted Customer" '], ['"UpCc" '])], # partner-based recipients are not mixed with emails-only, even if only CC [ (['"Uppercase Partner" '], []), ([], ['"UpCc" ']), ], ] ): with self.subTest(values=recipient_values): mail = self.env['mail.mail'].create({ 'body_html': '

Test

', 'email_from': '"Forced From" ', **recipient_values, }) with self.mock_mail_gateway(): mail.send() for exp_to, exp_cc in exp_recipients: self.assertSentEmail('"Forced From" ', exp_to, email_cc=exp_cc) @mute_logger('odoo.addons.mail.models.mail_mail') @patch('odoo.addons.base.models.ir_attachment.IrAttachment.file_size', new_callable=PropertyMock) def test_mail_mail_send_server_attachment_to_download_link(self, mock_attachment_file_size): """ Test that when the mail size exceeds the max email size limit, attachments are turned into download links added at the end of the email content. The feature is tested in the following conditions: - using a specified server or the default one (to test command ICP parameter) - in batch mode - with mail that exceed (with one or more attachments) or not the limit - with attachment owned by a business record or not: attachments not owned by a business record are never turned into links because their lifespans are not controlled by the user (might even be deleted right after the message is sent). """ def count_attachments(message): if isinstance(message, str): return 0 elif message.is_multipart(): return sum(count_attachments(part) for part in message.get_payload()) elif 'attachment' in message.get('Content-Disposition', ''): return 1 return 0 mock_attachment_file_size.return_value = 1024 * 128 # Define some constant to ease the understanding of the test test_mail_server = self.mail_server_domain_2 max_size_always_exceed = 0.1 max_size_never_exceed = 10 for n_attachment, mail_server, business_attachment, expected_is_links in ( # 1 attachment which doesn't exceed max size (1, self.env['ir.mail_server'], True, False), # 3 attachment: exceed max size (3, self.env['ir.mail_server'], True, True), # 1 attachment: exceed max size (1, self.env['ir.mail_server'], True, True), # Same as above with a specific server. Note that the default and server max_email size are reversed. (1, test_mail_server, True, False), (3, test_mail_server, True, True), (1, test_mail_server, True, True), # Attachments not linked to a business record are never turned to link (3, self.env['ir.mail_server'], False, False), (1, test_mail_server, False, False), ): # Setup max email size to check that the right maximum is used (default or mail server one) if expected_is_links: max_size_test_succeed = max_size_always_exceed * n_attachment max_size_test_fail = max_size_never_exceed else: max_size_test_succeed = max_size_never_exceed max_size_test_fail = max_size_always_exceed * n_attachment if mail_server: self.env['ir.config_parameter'].sudo().set_param('base.default_max_email_size', max_size_test_fail) mail_server.max_email_size = max_size_test_succeed else: self.env['ir.config_parameter'].sudo().set_param('base.default_max_email_size', max_size_test_succeed) attachments = self.env['ir.attachment'].sudo().create([{ 'name': f'attachment{idx_attachment}', 'res_name': 'test', 'res_model': self.test_record._name if business_attachment else 'mail.message', 'res_id': self.test_record.id if business_attachment else 0, 'datas': 'IA==', # a non-empty base64 content. We mock attachment file_size to simulate bigger size. } for idx_attachment in range(n_attachment)]) with self.mock_smtplib_connection(): mails = self.env['mail.mail'].create([{ 'attachment_ids': attachments.ids, 'body_html': '

Test

', 'email_from': 'test@test_2.com', 'email_to': f'mail_{mail_idx}@test.com', } for mail_idx in range(2)]) mails._send(mail_server=mail_server) self.assertEqual(len(self.emails), 2) for mail, outgoing_email in zip(mails, self.emails): message_raw = outgoing_email['message'] message_parsed = message_from_string(message_raw) message_cleaned = re.sub(r'[\s=]', '', message_raw) with self.subTest(n_attachment=n_attachment, mail_server=mail_server, business_attachment=business_attachment, expected_is_links=expected_is_links): if expected_is_links: self.assertEqual(count_attachments(message_parsed), 0, 'Attachments should have been removed (replaced by download links)') self.assertTrue(all(attachment.access_token for attachment in attachments), 'Original attachment should have been modified (access_token added)') self.assertTrue(all(attachment.access_token in message_cleaned for attachment in attachments), 'All attachments should have been turned into download links') else: self.assertEqual(count_attachments(message_parsed), n_attachment, 'All attachments should be present') self.assertEqual(message_cleaned.count('access_token'), 0, 'Attachments should not have been turned into download links') self.assertTrue(all(not attachment.access_token for attachment in attachments), 'Original attachment should not have been modified (access_token not added)') @tagged('mail_mail') class TestMailMailRace(common.TransactionCase): @mute_logger('odoo.addons.mail.models.mail_mail') def test_mail_bounce_during_send(self): cr = self.registry.cursor() env = api.Environment(cr, SUPERUSER_ID, {}) self.partner = env['res.partner'].create({ 'name': 'Ernest Partner', }) # we need to simulate a mail sent by the cron task, first create mail, message and notification by hand mail = env['mail.mail'].sudo().create({ 'body_html': '

Test

', 'is_notification': True, 'state': 'outgoing', 'recipient_ids': [(4, self.partner.id)] }) mail_message = mail.mail_message_id message = env['mail.message'].create({ 'subject': 'S', 'body': 'B', 'subtype_id': self.ref('mail.mt_comment'), 'notification_ids': [(0, 0, { 'res_partner_id': self.partner.id, 'mail_mail_id': mail.id, 'notification_type': 'email', 'is_read': True, 'notification_status': 'ready', })], }) notif = env['mail.notification'].search([('res_partner_id', '=', self.partner.id)]) # we need to commit transaction or cr will keep the lock on notif cr.commit() # patch send_email in order to create a concurent update and check the notif is already locked by _send() this = self # coding in javascript ruinned my life bounce_deferred = [] @api.model def send_email(self, message, *args, **kwargs): with this.registry.cursor() as cr, mute_logger('odoo.sql_db'): try: # try ro aquire lock (no wait) on notification (should fail) cr.execute("SELECT notification_status FROM mail_notification WHERE id = %s FOR UPDATE NOWAIT", [notif.id]) except psycopg2.OperationalError: # record already locked by send, all good bounce_deferred.append(True) else: # this should trigger psycopg2.extensions.TransactionRollbackError in send(). # Only here to simulate the initial use case # If the record is lock, this line would create a deadlock since we are in the same thread # In practice, the update will wait the end of the send() transaction and set the notif as bounce, as expeced cr.execute("UPDATE mail_notification SET notification_status='bounce' WHERE id = %s", [notif.id]) return message['Message-Id'] self.patch(self.registry['ir.mail_server'], 'send_email', send_email) mail.send() self.assertTrue(bounce_deferred, "The bounce should have been deferred") self.assertEqual(notif.notification_status, 'sent') # some cleaning since we commited the cr notif.unlink() mail.unlink() (mail_message | message).unlink() self.partner.unlink() cr.commit() cr.close()