Odoo18-Base/addons/test_mail/tests/test_mail_mail.py
2025-01-06 10:57:38 +07:00

1109 lines
56 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import psycopg2
import pytz
import re
import smtplib
from email import message_from_string
from datetime import datetime, timedelta
from freezegun import freeze_time
from markupsafe import Markup
from OpenSSL.SSL import Error as SSLError
from socket import gaierror, timeout
from unittest.mock import call, patch, PropertyMock
from odoo import api, Command, fields, SUPERUSER_ID
from odoo.addons.base.models.ir_mail_server import MailDeliveryException
from odoo.addons.mail.tests.common import MailCommon
from odoo.exceptions import AccessError
from odoo.tests import common, tagged, users
from odoo.tools import formataddr, mute_logger
@tagged('mail_mail')
class TestMailMail(MailCommon):
@classmethod
def setUpClass(cls):
super(TestMailMail, cls).setUpClass()
cls.test_record = cls.env['mail.test.gateway'].with_context(cls._test_context).create({
'name': 'Test',
'email_from': 'ignasse@example.com',
}).with_context({})
cls.test_message = cls.test_record.message_post(body=Markup('<p>Message</p>'), subject='Subject')
cls.test_mail = cls.env['mail.mail'].create([{
'body': Markup('<p>Body</p>'),
'email_from': False,
'email_to': 'test@example.com',
'is_notification': True,
'subject': 'Subject',
}])
cls.test_notification = cls.env['mail.notification'].create({
'is_read': False,
'mail_mail_id': cls.test_mail.id,
'mail_message_id': cls.test_message.id,
'notification_type': 'email',
'res_partner_id': cls.partner_employee.id, # not really used for matching except multi-recipients
})
cls.emails_falsy = [False, '', ' ']
cls.emails_invalid = ['buggy', 'buggy, wrong']
cls.emails_invalid_ascii = ['raoul@example¢¡.com']
cls.emails_valid = ['raoul¢¡@example.com', 'raoul@example.com']
def _reset_data(self):
self._init_mail_mock()
self.test_mail.write({'failure_reason': False, 'failure_type': False, 'state': 'outgoing'})
self.test_notification.write({'failure_reason': False, 'failure_type': False, 'notification_status': 'ready'})
@users('admin')
def test_mail_mail_attachment_access(self):
mail = self.env['mail.mail'].create({
'body_html': 'Test',
'email_to': 'test@example.com',
'partner_ids': [(4, self.user_employee.partner_id.id)],
'attachment_ids': [
(0, 0, {'name': 'file 1', 'datas': 'c2VjcmV0'}),
(0, 0, {'name': 'file 2', 'datas': 'c2VjcmV0'}),
(0, 0, {'name': 'file 3', 'datas': 'c2VjcmV0'}),
(0, 0, {'name': 'file 4', 'datas': 'c2VjcmV0'}),
],
})
def _patched_check(self, *args, **kwargs):
if self.env.is_superuser():
return
if any(attachment.name in ('file 2', 'file 4') for attachment in self):
raise AccessError('No access')
mail.invalidate_recordset()
new_attachment = self.env['ir.attachment'].create({
'name': 'new file',
'datas': 'c2VjcmV0',
})
with patch.object(type(self.env['ir.attachment']), 'check', _patched_check):
# Sanity check
self.assertEqual(mail.restricted_attachment_count, 2)
self.assertEqual(len(mail.unrestricted_attachment_ids), 2)
self.assertEqual(mail.unrestricted_attachment_ids.mapped('name'), ['file 1', 'file 3'])
# Add a new attachment
mail.write({
'unrestricted_attachment_ids': [Command.link(new_attachment.id)],
})
self.assertEqual(mail.restricted_attachment_count, 2)
self.assertEqual(len(mail.unrestricted_attachment_ids), 3)
self.assertEqual(mail.unrestricted_attachment_ids.mapped('name'), ['file 1', 'file 3', 'new file'])
self.assertEqual(len(mail.attachment_ids), 5)
# Remove an attachment
mail.write({
'unrestricted_attachment_ids': [Command.unlink(new_attachment.id)],
})
self.assertEqual(mail.restricted_attachment_count, 2)
self.assertEqual(len(mail.unrestricted_attachment_ids), 2)
self.assertEqual(mail.unrestricted_attachment_ids.mapped('name'), ['file 1', 'file 3'])
self.assertEqual(len(mail.attachment_ids), 4)
# Reset command
mail.invalidate_recordset()
mail.write({'unrestricted_attachment_ids': [Command.clear()]})
self.assertEqual(len(mail.unrestricted_attachment_ids), 0)
self.assertEqual(len(mail.attachment_ids), 2)
# Read in SUDO
mail.invalidate_recordset()
self.assertEqual(mail.sudo().restricted_attachment_count, 2)
self.assertEqual(len(mail.sudo().unrestricted_attachment_ids), 0)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_headers(self):
""" Test headers management when set on outgoing mail. """
# mail without thread-enabled record
base_values = {
'body_html': '<p>Test</p>',
'email_to': 'test@example.com',
'headers': {'foo': 'bar'},
}
for headers, expected in [
({'foo': 'bar'}, {'foo': 'bar'}),
("{'foo': 'bar'}", {'foo': 'bar'}),
("{'foo': 'bar', 'baz': '3+2'}", {'foo': 'bar', 'baz': '3+2'}),
(['not_a_dict'], {}),
('alsonotadict', {}),
("['not_a_dict']", {}),
("{'invaliddict'}", {}),
]:
with self.subTest(headers=headers, expected=expected):
mail = self.env['mail.mail'].create([
dict(base_values, headers=headers)
])
with self.mock_mail_gateway():
mail.send()
for key, value in expected.items():
self.assertIn(key, self._mails[0]['headers'])
self.assertEqual(self._mails[0]['headers'][key], value)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_recipients(self):
""" Partner_ids is a field used from mail_message, but not from mail_mail. """
mail = self.env['mail.mail'].sudo().create({
'body_html': '<p>Test</p>',
'email_to': 'test@example.com',
'partner_ids': [(4, self.user_employee.partner_id.id)]
})
with self.mock_mail_gateway():
mail.send()
self.assertSentEmail(mail.env.user.partner_id, ['test@example.com'])
self.assertEqual(len(self._mails), 1)
mail = self.env['mail.mail'].sudo().create({
'body_html': '<p>Test</p>',
'email_to': 'test@example.com',
'recipient_ids': [(4, self.user_employee.partner_id.id)],
})
with self.mock_mail_gateway():
mail.send()
self.assertSentEmail(mail.env.user.partner_id, ['test@example.com'])
self.assertSentEmail(mail.env.user.partner_id, [self.user_employee.email_formatted])
self.assertEqual(len(self._mails), 2)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_recipients_cc(self):
""" Partner_ids is a field used from mail_message, but not from mail_mail. """
mail = self.env['mail.mail'].sudo().create({
'body_html': '<p>Test</p>',
'email_cc': 'test.cc.1@example.com, "Herbert" <test.cc.2@example.com>',
'email_to': 'test.rec.1@example.com, "Raoul" <test.rec.2@example.com>',
'recipient_ids': [(4, self.user_employee.partner_id.id)],
})
with self.mock_mail_gateway():
mail.send()
# note that formatting is lost for cc
self.assertSentEmail(mail.env.user.partner_id,
['test.rec.1@example.com', '"Raoul" <test.rec.2@example.com>'],
email_cc=['test.cc.1@example.com', '"Herbert" <test.cc.2@example.com>'])
# don't put CCs as copy of each outgoing email, only the first one (and never
# with partner based recipients as those may receive specific links)
self.assertSentEmail(mail.env.user.partner_id, [self.user_employee.email_formatted],
email_cc=[])
self.assertEqual(len(self._mails), 2)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_recipients_formatting(self):
""" Check support of email / formatted email """
mail = self.env['mail.mail'].sudo().create({
'author_id': False,
'body_html': '<p>Test</p>',
'email_cc': 'test.cc.1@example.com, "Herbert" <test.cc.2@example.com>',
'email_from': '"Ignasse" <test.from@example.com>',
'email_to': 'test.rec.1@example.com, "Raoul" <test.rec.2@example.com>',
})
with self.mock_mail_gateway():
mail.send()
# note that formatting is lost for cc
self.assertSentEmail('"Ignasse" <test.from@example.com>',
['test.rec.1@example.com', '"Raoul" <test.rec.2@example.com>'],
email_cc=['test.cc.1@example.com', '"Herbert" <test.cc.2@example.com>'])
self.assertEqual(len(self._mails), 1)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_return_path(self):
# mail without thread-enabled record
base_values = {
'body_html': '<p>Test</p>',
'email_to': 'test@example.com',
}
mail = self.env['mail.mail'].create(base_values)
with self.mock_mail_gateway():
mail.send()
self.assertEqual(self._mails[0]['headers']['Return-Path'], '%s@%s' % (self.alias_bounce, self.alias_domain))
# mail on thread-enabled record
mail = self.env['mail.mail'].create(dict(base_values, **{
'model': self.test_record._name,
'res_id': self.test_record.id,
}))
with self.mock_mail_gateway():
mail.send()
self.assertEqual(self._mails[0]['headers']['Return-Path'], '%s@%s' % (self.alias_bounce, self.alias_domain))
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.tests')
def test_mail_mail_schedule(self):
"""Test that a mail scheduled in the past/future are sent or not"""
now = datetime(2022, 6, 28, 14, 0, 0)
scheduled_datetimes = [
# falsy values
False, '', 'This is not a date format',
# datetimes (UTC/GMT +10 hours for Australia/Brisbane)
now, pytz.timezone('Australia/Brisbane').localize(now),
# string
fields.Datetime.to_string(now - timedelta(days=1)),
fields.Datetime.to_string(now + timedelta(days=1)),
(now + timedelta(days=1)).strftime("%H:%M:%S %d-%m-%Y"),
# tz: is actually 1 hour before now in UTC
(now + timedelta(hours=3)).strftime("%H:%M:%S %d-%m-%Y") + " +0400",
# tz: is actually 1 hour after now in UTC
(now + timedelta(hours=-3)).strftime("%H:%M:%S %d-%m-%Y") + " -0400",
]
expected_datetimes = [
False, False, False,
now, now - pytz.timezone('Australia/Brisbane').utcoffset(now),
now - timedelta(days=1), now + timedelta(days=1), now + timedelta(days=1),
now + timedelta(hours=-1),
now + timedelta(hours=1),
]
expected_states = [
# falsy values = send now
'sent', 'sent', 'sent',
'sent', 'sent',
'sent', 'outgoing', 'outgoing',
'sent', 'outgoing'
]
mails = self.env['mail.mail'].create([
{'body_html': '<p>Test</p>',
'email_to': 'test@example.com',
'scheduled_date': scheduled_datetime,
} for scheduled_datetime in scheduled_datetimes
])
for mail, expected_datetime, scheduled_datetime in zip(mails, expected_datetimes, scheduled_datetimes):
self.assertEqual(mail.scheduled_date, expected_datetime,
'Scheduled date: %s should be stored as %s, received %s' % (scheduled_datetime, expected_datetime, mail.scheduled_date))
self.assertEqual(mail.state, 'outgoing')
with freeze_time(now):
self.env['mail.mail'].process_email_queue()
for mail, expected_state in zip(mails, expected_states):
self.assertEqual(mail.state, expected_state)
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.tests')
def test_mail_mail_send_configuration(self):
""" Test configuration and control of email queue """
self.env['mail.mail'].search([]).unlink() # cleanup queue
# test 'mail.mail.queue.batch.size': cron fetch size
for queue_batch_size, exp_send_count in [
(3, 3),
(0, 10), # maximum available
(False, 10), # maximum available
]:
with self.subTest(queue_batch_size=queue_batch_size), \
self.mock_mail_gateway():
self.env['ir.config_parameter'].sudo().set_param('mail.mail.queue.batch.size', queue_batch_size)
mails = self.env['mail.mail'].create([
{
'auto_delete': False,
'body_html': f'Batch Email {idx}',
'email_from': 'test.from@mycompany.example.com',
'email_to': 'test.outgoing@test.example.com',
'state': 'outgoing',
}
for idx in range(10)
])
self.env['mail.mail'].process_email_queue()
self.assertEqual(len(self._mails), exp_send_count)
mails.write({'state': 'sent'}) # avoid conflicts between batch
# test 'mail.session.batch.size': batch send size
self.env['ir.config_parameter'].sudo().set_param('mail.mail.queue.batch.size', False)
for session_batch_size, exp_call_count in [
(3, 4), # 10 mails -> 4 iterations of 3
(0, 1),
(False, 1),
]:
with self.subTest(session_batch_size=session_batch_size), \
self.mock_mail_gateway():
self.env['ir.config_parameter'].sudo().set_param('mail.session.batch.size', session_batch_size)
mails = self.env['mail.mail'].create([
{
'auto_delete': False,
'body_html': f'Batch Email {idx}',
'email_from': 'test.from@mycompany.example.com',
'email_to': 'test.outgoing@test.example.com',
'state': 'outgoing',
}
for idx in range(10)
])
self.env['mail.mail'].process_email_queue()
self.assertEqual(self.mail_mail_private_send_mocked.call_count, exp_call_count)
mails.write({'state': 'sent'}) # avoid conflicts between batch
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_send_exceptions_origin(self):
""" Test various use case with exceptions and errors and see how they are
managed and stored at mail and notification level. """
mail, notification = self.test_mail, self.test_notification
# MailServer.build_email(): invalid from (missing)
for default_from in [False, '']:
self.mail_alias_domain.default_from = default_from
self._reset_data()
with self.mock_mail_gateway(), mute_logger('odoo.addons.mail.models.mail_mail'):
mail.send(raise_exception=False)
self.assertFalse(self._mails[0]['email_from'])
self.assertEqual(
mail.failure_reason,
'You must either provide a sender address explicitly or configure using the combination of `mail.catchall.domain` and `mail.default.from` ICPs, in the server configuration file or with the --email-from startup parameter.')
self.assertEqual(mail.failure_type, 'mail_from_missing')
self.assertEqual(mail.state, 'exception')
self.assertEqual(
notification.failure_reason,
'You must either provide a sender address explicitly or configure using the combination of `mail.catchall.domain` and `mail.default.from` ICPs, in the server configuration file or with the --email-from startup parameter.')
self.assertEqual(notification.failure_type, 'mail_from_missing')
self.assertEqual(notification.notification_status, 'exception')
# MailServer.send_email(): _prepare_email_message: unexpected ASCII / Malformed 'Return-Path' or 'From' address
# Force bounce alias to void, will force usage of email_from
self.mail_alias_domain.bounce_alias = False
self.env.company.invalidate_recordset(fnames={'bounce_email', 'bounce_formatted'})
for email_from in ['strange@example¢¡.com', 'robert']:
self._reset_data()
mail.write({'email_from': email_from})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(self._mails[0]['email_from'], email_from)
self.assertEqual(mail.failure_reason, f"Malformed 'Return-Path' or 'From' address: {email_from} - It should contain one valid plain ASCII email")
self.assertEqual(mail.failure_type, 'mail_from_invalid')
self.assertEqual(mail.state, 'exception')
self.assertEqual(notification.failure_reason, f"Malformed 'Return-Path' or 'From' address: {email_from} - It should contain one valid plain ASCII email")
self.assertEqual(notification.failure_type, 'mail_from_invalid')
self.assertEqual(notification.notification_status, 'exception')
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_send_exceptions_recipients_emails(self):
""" Test various use case with exceptions and errors and see how they are
managed and stored at mail and notification level. """
mail, notification = self.test_mail, self.test_notification
# MailServer.send_email(): _prepare_email_message: missing To
for email_to in self.emails_falsy:
self._reset_data()
mail.write({'email_to': email_to})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.')
self.assertFalse(mail.failure_type, 'Mail: missing email_to: no failure type, should be updated')
self.assertEqual(mail.state, 'exception')
if email_to == ' ':
self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated')
self.assertEqual(notification.failure_type, 'mail_email_missing')
self.assertEqual(notification.notification_status, 'exception')
else:
self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated')
self.assertEqual(notification.failure_type, False, 'Mail: missing email_to: notification is wrongly set as sent')
self.assertEqual(notification.notification_status, 'sent', 'Mail: missing email_to: notification is wrongly set as sent')
# MailServer.send_email(): _prepare_email_message: invalid To
for email_to, failure_type in zip(
self.emails_invalid,
['mail_email_missing', 'mail_email_missing']
):
self._reset_data()
mail.write({'email_to': email_to})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.')
self.assertFalse(mail.failure_type, 'Mail: invalid email_to: no failure type, should be updated')
self.assertEqual(mail.state, 'exception')
self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated')
self.assertEqual(notification.failure_type, failure_type, 'Mail: invalid email_to: missing instead of invalid')
self.assertEqual(notification.notification_status, 'exception')
# MailServer.send_email(): _prepare_email_message: invalid To (ascii)
for email_to in self.emails_invalid_ascii:
self._reset_data()
mail.write({'email_to': email_to})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.')
self.assertFalse(mail.failure_type, 'Mail: invalid (ascii) recipient partner: no failure type, should be updated')
self.assertEqual(mail.state, 'exception')
self.assertEqual(notification.failure_type, 'mail_email_invalid')
self.assertEqual(notification.notification_status, 'exception')
# MailServer.send_email(): _prepare_email_message: ok To (ascii or just ok)
for email_to in self.emails_valid:
self._reset_data()
mail.write({'email_to': email_to})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertFalse(mail.failure_reason)
self.assertFalse(mail.failure_type)
self.assertEqual(mail.state, 'sent')
self.assertFalse(notification.failure_reason)
self.assertFalse(notification.failure_type)
self.assertEqual(notification.notification_status, 'sent')
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_send_exceptions_recipients_partners(self):
""" Test various use case with exceptions and errors and see how they are
managed and stored at mail and notification level. """
mail, notification = self.test_mail, self.test_notification
mail.write({'email_from': 'test.user@test.example.com', 'email_to': False})
partners_falsy = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_falsy
])
partners_invalid = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_invalid
])
partners_invalid_ascii = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_invalid_ascii
])
partners_valid = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_valid
])
# void values
for partner in partners_falsy:
self._reset_data()
mail.write({'recipient_ids': [(5, 0), (4, partner.id)]})
notification.write({'res_partner_id': partner.id})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.')
self.assertFalse(mail.failure_type, 'Mail: void recipient partner: no failure type, should be updated')
self.assertEqual(mail.state, 'exception')
self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated')
self.assertEqual(notification.failure_type, 'mail_email_invalid', 'Mail: void recipient partner: should be missing, not invalid')
self.assertEqual(notification.notification_status, 'exception')
# wrong values
for partner in partners_invalid:
self._reset_data()
mail.write({'recipient_ids': [(5, 0), (4, partner.id)]})
notification.write({'res_partner_id': partner.id})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.')
self.assertFalse(mail.failure_type, 'Mail: invalid recipient partner: no failure type, should be updated')
self.assertEqual(mail.state, 'exception')
self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated')
self.assertEqual(notification.failure_type, 'mail_email_invalid')
self.assertEqual(notification.notification_status, 'exception')
# ascii ko
for partner in partners_invalid_ascii:
self._reset_data()
mail.write({'recipient_ids': [(5, 0), (4, partner.id)]})
notification.write({'res_partner_id': partner.id})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, 'Error without exception. Probably due to sending an email without computed recipients.')
self.assertFalse(mail.failure_type, 'Mail: invalid (ascii) recipient partner: no failure type, should be updated')
self.assertEqual(mail.state, 'exception')
self.assertEqual(notification.failure_type, 'mail_email_invalid')
self.assertEqual(notification.notification_status, 'exception')
# ascii ok or just ok
for partner in partners_valid:
self._reset_data()
mail.write({'recipient_ids': [(5, 0), (4, partner.id)]})
notification.write({'res_partner_id': partner.id})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertFalse(mail.failure_reason)
self.assertFalse(mail.failure_type)
self.assertEqual(mail.state, 'sent')
self.assertFalse(notification.failure_reason)
self.assertFalse(notification.failure_type)
self.assertEqual(notification.notification_status, 'sent')
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_send_exceptions_recipients_partners_mixed(self):
""" Test various use case with exceptions and errors and see how they are
managed and stored at mail and notification level. """
mail, notification = self.test_mail, self.test_notification
mail.write({'email_to': 'test@example.com'})
partners_falsy = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_falsy
])
partners_invalid = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_invalid
])
partners_valid = self.env['res.partner'].create([
{'name': 'Name %s' % email, 'email': email}
for email in self.emails_valid
])
# valid to, missing email for recipient or wrong email for recipient
for partner in partners_falsy + partners_invalid:
self._reset_data()
mail.write({'recipient_ids': [(5, 0), (4, partner.id)]})
notification.write({'res_partner_id': partner.id})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertFalse(mail.failure_reason, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertFalse(mail.failure_type, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertEqual(mail.state, 'sent', 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertFalse(notification.failure_reason, 'Mail: void email considered as invalid')
self.assertEqual(notification.failure_type, 'mail_email_invalid', 'Mail: void email considered as invalid')
self.assertEqual(notification.notification_status, 'exception')
# update to have valid partner and invalid partner
mail.write({'recipient_ids': [(5, 0), (4, partners_valid[1].id), (4, partners_falsy[0].id)]})
notification.write({'res_partner_id': partners_valid[1].id})
notification2 = notification.create({
'is_read': False,
'mail_mail_id': mail.id,
'mail_message_id': self.test_message.id,
'notification_type': 'email',
'res_partner_id': partners_falsy[0].id,
})
# missing to / invalid to
for email_to in self.emails_falsy + self.emails_invalid:
self._reset_data()
notification2.write({'failure_reason': False, 'failure_type': False, 'notification_status': 'ready'})
mail.write({'email_to': email_to})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertFalse(mail.failure_reason, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertFalse(mail.failure_type, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertEqual(mail.state, 'sent', 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertFalse(notification.failure_reason)
self.assertFalse(notification.failure_type)
self.assertEqual(notification.notification_status, 'sent')
self.assertFalse(notification2.failure_reason)
self.assertEqual(notification2.failure_type, 'mail_email_invalid')
self.assertEqual(notification2.notification_status, 'exception')
# buggy to (ascii)
for email_to in self.emails_invalid_ascii:
self._reset_data()
notification2.write({'failure_reason': False, 'failure_type': False, 'notification_status': 'ready'})
mail.write({'email_to': email_to})
with self.mock_mail_gateway():
mail.send(raise_exception=False)
self.assertFalse(mail.failure_type, 'Mail: at least one valid recipient, mail is sent to avoid send loops and spam')
self.assertEqual(mail.state, 'sent')
self.assertFalse(notification.failure_type)
self.assertEqual(notification.notification_status, 'sent')
self.assertEqual(notification2.failure_type, 'mail_email_invalid')
self.assertEqual(notification2.notification_status, 'exception')
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_send_exceptions_raise_management(self):
""" Test various use case with exceptions and errors and see how they are
managed and stored at mail and notification level. """
mail, notification = self.test_mail, self.test_notification
mail.write({'email_from': 'test.user@test.example.com', 'email_to': 'test@example.com'})
# SMTP connecting issues
with self.mock_mail_gateway():
_connect_current = self.connect_mocked.side_effect
# classic errors that may be raised during sending, just to test their current support
for error, msg in [
(smtplib.SMTPServerDisconnected('SMTPServerDisconnected'), 'SMTPServerDisconnected'),
(smtplib.SMTPResponseException('code', 'SMTPResponseException'), 'code\nSMTPResponseException'),
(smtplib.SMTPNotSupportedError('SMTPNotSupportedError'), 'SMTPNotSupportedError'),
(smtplib.SMTPException('SMTPException'), 'SMTPException'),
(SSLError('SSLError'), 'SSLError'),
(gaierror('gaierror'), 'gaierror'),
(timeout('timeout'), 'timeout')]:
def _connect(*args, **kwargs):
raise error
self.connect_mocked.side_effect = _connect
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, msg)
self.assertFalse(mail.failure_type)
self.assertEqual(mail.state, 'exception')
self.assertFalse(notification.failure_reason, 'Mail: failure reason not propagated')
self.assertEqual(notification.failure_type, 'mail_smtp')
self.assertEqual(notification.notification_status, 'exception')
self._reset_data()
self.connect_mocked.side_effect = _connect_current
# SMTP sending issues
with self.mock_mail_gateway():
_send_current = self.send_email_mocked.side_effect
self._reset_data()
mail.write({'email_to': 'test@example.com'})
# should always raise for those errors, even with raise_exception=False
for error, error_class in [
(smtplib.SMTPServerDisconnected("Some exception"), smtplib.SMTPServerDisconnected),
(MemoryError("Some exception"), MemoryError)]:
def _send_email(*args, **kwargs):
raise error
self.send_email_mocked.side_effect = _send_email
with self.assertRaises(error_class):
mail.send(raise_exception=False)
self.assertFalse(mail.failure_reason, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback')
self.assertFalse(mail.failure_type, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback')
self.assertEqual(mail.state, 'outgoing', 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback')
self.assertFalse(notification.failure_reason, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback')
self.assertFalse(notification.failure_type, 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback')
self.assertEqual(notification.notification_status, 'ready', 'SMTPServerDisconnected/MemoryError during Send raises and lead to a rollback')
# MailDeliveryException: should be catched; other issues are sub-catched under
# a MailDeliveryException and are catched
for error, msg in [
(MailDeliveryException("Some exception"), 'Some exception'),
(ValueError("Unexpected issue"), 'Unexpected issue')]:
def _send_email(*args, **kwargs):
raise error
self.send_email_mocked.side_effect = _send_email
self._reset_data()
mail.send(raise_exception=False)
self.assertEqual(mail.failure_reason, msg)
self.assertEqual(mail.failure_type, 'unknown', 'Mail: unlogged failure type to fix')
self.assertEqual(mail.state, 'exception')
self.assertEqual(notification.failure_reason, msg)
self.assertEqual(notification.failure_type, 'unknown', 'Mail: generic failure type')
self.assertEqual(notification.notification_status, 'exception')
self.send_email_mocked.side_effect = _send_current
def test_mail_mail_values_misc(self):
""" Test various values on mail.mail, notably default values """
msg = self.env['mail.mail'].create({})
self.assertEqual(msg.message_type, 'email_outgoing', 'Mails should have outgoing email type by default')
@tagged('mail_mail', 'mail_server')
class TestMailMailServer(MailCommon):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.mail_server_domain_2 = cls.env['ir.mail_server'].create({
'from_filter': 'test_2.com',
'name': 'Server 2',
'smtp_host': 'test_2.com',
})
cls.test_record = cls.env['mail.test.gateway'].with_context(cls._test_context).create({
'name': 'Test',
'email_from': 'ignasse@example.com',
}).with_context({})
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_send_server(self):
"""Test that the mails are send in batch.
Batch are defined by the mail server and the email from field.
"""
self.assertEqual(
self.env['ir.mail_server']._get_default_from_address(),
f'{self.default_from}@{self.alias_domain}'
)
mail_values = {
'body_html': '<p>Test</p>',
'email_to': 'user@example.com',
}
# Should be encapsulated in the notification email
mails = self.env['mail.mail'].create([{
**mail_values,
'email_from': 'test@unknown_domain.com',
} for _ in range(5)]) | self.env['mail.mail'].create([{
**mail_values,
'email_from': 'test_2@unknown_domain.com',
} for _ in range(5)])
# Should use the test_2 mail server
# Once with "user_1@test_2.com" as login
# Once with "user_2@test_2.com" as login
mails += self.env['mail.mail'].create([{
**mail_values,
'email_from': 'user_1@test_2.com',
} for _ in range(5)]) + self.env['mail.mail'].create([{
**mail_values,
'email_from': 'user_2@test_2.com',
} for _ in range(5)])
# Mail server is forced
mails += self.env['mail.mail'].create([{
**mail_values,
'email_from': 'user_1@test_2.com',
'mail_server_id': self.mail_server_domain.id,
} for _ in range(5)])
with self.mock_smtplib_connection():
mails.send()
self.assertEqual(self.find_mail_server_mocked.call_count, 4, 'Must be called only once per "mail from" when the mail server is not forced')
self.assertEqual(len(self.emails), 25)
# Check call to the connect method to ensure that we authenticate
# to the right mail server with the right login
self.assertEqual(self.connect_mocked.call_count, 4, 'Must be called once per batch which share the same mail server and the same smtp from')
self.connect_mocked.assert_has_calls(
calls=[
call(smtp_from=f'{self.default_from}@{self.alias_domain}', mail_server_id=self.mail_server_notification.id),
call(smtp_from='user_1@test_2.com', mail_server_id=self.mail_server_domain_2.id),
call(smtp_from='user_2@test_2.com', mail_server_id=self.mail_server_domain_2.id),
call(smtp_from='user_1@test_2.com', mail_server_id=self.mail_server_domain.id),
],
any_order=True,
)
self.assertSMTPEmailsSent(message_from=f'"test" <{self.default_from}@{self.alias_domain}>',
emails_count=5, from_filter=self.mail_server_notification.from_filter)
self.assertSMTPEmailsSent(message_from=f'"test_2" <{self.default_from}@{self.alias_domain}>',
emails_count=5, from_filter=self.mail_server_notification.from_filter)
self.assertSMTPEmailsSent(message_from='user_1@test_2.com', emails_count=5, mail_server=self.mail_server_domain_2)
self.assertSMTPEmailsSent(message_from='user_2@test_2.com', emails_count=5, mail_server=self.mail_server_domain_2)
self.assertSMTPEmailsSent(message_from='user_1@test_2.com', emails_count=5, mail_server=self.mail_server_domain)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_values_email_formatted(self):
""" Test outgoing email values, with formatting """
customer = self.env['res.partner'].create({
'name': 'Tony Customer',
'email': '"Formatted Emails" <tony.customer@test.example.com>',
})
mail = self.env['mail.mail'].create({
'body_html': '<p>Test</p>',
'email_cc': '"Ignasse, le Poilu" <test.cc.1@test.example.com>',
'email_to': '"Raoul, le Grand" <test.email.1@test.example.com>, "Micheline, l\'immense" <test.email.2@test.example.com>',
'recipient_ids': [(4, self.user_employee.partner_id.id), (4, customer.id)]
})
with self.mock_mail_gateway():
mail.send()
self.assertEqual(len(self._mails), 3, 'Mail: sent 3 emails: 1 for email_to, 1 / recipient')
self.assertEqual(
sorted(sorted(_mail['email_to']) for _mail in self._mails),
sorted([sorted(['"Raoul, le Grand" <test.email.1@test.example.com>', '"Micheline, l\'immense" <test.email.2@test.example.com>']),
[formataddr((self.user_employee.name, self.user_employee.email_normalized))],
[formataddr(("Tony Customer", 'tony.customer@test.example.com'))]
]),
'Mail: formatting issues should have been removed as much as possible'
)
# CC are added to first email
self.assertEqual(
[_mail['email_cc'] for _mail in self._mails],
[['"Ignasse, le Poilu" <test.cc.1@test.example.com>'], [], []],
'Mail: currently always removing formatting in email_cc'
)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_values_email_multi(self):
""" Test outgoing email values, with email field holding multi emails """
# Multi
customer = self.env['res.partner'].create({
'name': 'Tony Customer',
'email': 'tony.customer@test.example.com, norbert.customer@test.example.com',
})
mail = self.env['mail.mail'].create({
'body_html': '<p>Test</p>',
'email_cc': 'test.cc.1@test.example.com, test.cc.2@test.example.com',
'email_to': 'test.email.1@test.example.com, test.email.2@test.example.com',
'recipient_ids': [(4, self.user_employee.partner_id.id), (4, customer.id)]
})
with self.mock_mail_gateway():
mail.send()
self.assertEqual(len(self._mails), 3, 'Mail: sent 3 emails: 1 for email_to, 1 / recipient')
self.assertEqual(
sorted(sorted(_mail['email_to']) for _mail in self._mails),
sorted([sorted(['test.email.1@test.example.com', 'test.email.2@test.example.com']),
[formataddr((self.user_employee.name, self.user_employee.email_normalized))],
sorted([formataddr(("Tony Customer", 'tony.customer@test.example.com')),
formataddr(("Tony Customer", 'norbert.customer@test.example.com'))]),
]),
'Mail: formatting issues should have been removed as much as possible (multi emails in a single address are managed '
'like separate emails when sending with recipient_ids'
)
# CC are added to first email
self.assertEqual(
[_mail['email_cc'] for _mail in self._mails],
[['test.cc.1@test.example.com', 'test.cc.2@test.example.com'], [], []],
)
# Multi + formatting
customer = self.env['res.partner'].create({
'name': 'Tony Customer',
'email': 'tony.customer@test.example.com, "Norbert Customer" <norbert.customer@test.example.com>',
})
mail = self.env['mail.mail'].create({
'body_html': '<p>Test</p>',
'email_cc': 'test.cc.1@test.example.com, test.cc.2@test.example.com',
'email_to': 'test.email.1@test.example.com, test.email.2@test.example.com',
'recipient_ids': [(4, self.user_employee.partner_id.id), (4, customer.id)]
})
with self.mock_mail_gateway():
mail.send()
self.assertEqual(len(self._mails), 3, 'Mail: sent 3 emails: 1 for email_to, 1 / recipient')
self.assertEqual(
sorted(sorted(_mail['email_to']) for _mail in self._mails),
sorted([sorted(['test.email.1@test.example.com', 'test.email.2@test.example.com']),
[formataddr((self.user_employee.name, self.user_employee.email_normalized))],
sorted([formataddr(("Tony Customer", 'tony.customer@test.example.com')),
formataddr(("Tony Customer", 'norbert.customer@test.example.com'))]),
]),
'Mail: formatting issues should have been removed as much as possible (multi emails in a single address are managed '
'like separate emails when sending with recipient_ids (and partner name is always used as name part)'
)
# CC are added to first email
self.assertEqual(
[_mail['email_cc'] for _mail in self._mails],
[['test.cc.1@test.example.com', 'test.cc.2@test.example.com'], [], []],
)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_values_headers(self):
""" Test headers content, notably X-Odoo-Message-Id added to keep context
when going through exotic mail providers that change our message IDs. """
mail = self.env['mail.mail'].create({
'body_html': '<p>Test</p>',
'email_to': 'test.😊@example.com',
})
message_id = mail.message_id
with self.mock_mail_gateway():
mail.send()
self.assertEqual(len(self._mails), 1)
self.assertDictEqual(
self._mails[0]['headers'],
{
'Return-Path': f'{self.alias_bounce}@{self.alias_domain}',
'X-Odoo-Message-Id': message_id,
}
)
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_values_email_unicode(self):
""" Unicode should be fine. """
mail = self.env['mail.mail'].create({
'body_html': '<p>Test</p>',
'email_cc': 'test.😊.cc@example.com',
'email_to': 'test.😊@example.com',
})
with self.mock_mail_gateway():
mail.send()
self.assertEqual(len(self._mails), 1)
self.assertEqual(self._mails[0]['email_cc'], ['test.😊.cc@example.com'])
self.assertEqual(self._mails[0]['email_to'], ['test.😊@example.com'])
@users('admin')
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_mail_values_email_uppercase(self):
""" Test uppercase support when comparing emails, notably due to
'send_validated_to' introduction that checks emails before sending them. """
customer = self.env['res.partner'].create({
'name': 'Uppercase Partner',
'email': 'Uppercase.Partner.youpie@example.gov.uni',
})
for recipient_values, exp_recipients in zip(
[
{'email_to': 'Uppercase.Customer.to@example.gov.uni'},
{'email_to': '"Formatted Customer" <Uppercase.Customer.to@example.gov.uni>', 'email_cc': '"UpCc" <Uppercase.Customer.cc@example.gov.uni>'},
{'recipient_ids': [(4, customer.id)], 'email_cc': '"UpCc" <Uppercase.Customer.cc@example.gov.uni>'},
], [
[(['uppercase.customer.to@example.gov.uni'], [])],
[(['"Formatted Customer" <uppercase.customer.to@example.gov.uni>'], ['"UpCc" <uppercase.customer.cc@example.gov.uni>'])],
# partner-based recipients are not mixed with emails-only, even if only CC
[
(['"Uppercase Partner" <uppercase.partner.youpie@example.gov.uni>'], []),
([], ['"UpCc" <uppercase.customer.cc@example.gov.uni>']),
],
]
):
with self.subTest(values=recipient_values):
mail = self.env['mail.mail'].create({
'body_html': '<p>Test</p>',
'email_from': '"Forced From" <Forced.From@test.example.com>',
**recipient_values,
})
with self.mock_mail_gateway():
mail.send()
for exp_to, exp_cc in exp_recipients:
self.assertSentEmail('"Forced From" <forced.from@test.example.com>', exp_to, email_cc=exp_cc)
@mute_logger('odoo.addons.mail.models.mail_mail')
@patch('odoo.addons.base.models.ir_attachment.IrAttachment.file_size', new_callable=PropertyMock)
def test_mail_mail_send_server_attachment_to_download_link(self, mock_attachment_file_size):
""" Test that when the mail size exceeds the max email size limit,
attachments are turned into download links added at the end of the
email content.
The feature is tested in the following conditions:
- using a specified server or the default one (to test command ICP parameter)
- in batch mode
- with mail that exceed (with one or more attachments) or not the limit
- with attachment owned by a business record or not: attachments not owned by a
business record are never turned into links because their lifespans are not
controlled by the user (might even be deleted right after the message is sent).
"""
def count_attachments(message):
if isinstance(message, str):
return 0
elif message.is_multipart():
return sum(count_attachments(part) for part in message.get_payload())
elif 'attachment' in message.get('Content-Disposition', ''):
return 1
return 0
mock_attachment_file_size.return_value = 1024 * 128
# Define some constant to ease the understanding of the test
test_mail_server = self.mail_server_domain_2
max_size_always_exceed = 0.1
max_size_never_exceed = 10
for n_attachment, mail_server, business_attachment, expected_is_links in (
# 1 attachment which doesn't exceed max size
(1, self.env['ir.mail_server'], True, False),
# 3 attachment: exceed max size
(3, self.env['ir.mail_server'], True, True),
# 1 attachment: exceed max size
(1, self.env['ir.mail_server'], True, True),
# Same as above with a specific server. Note that the default and server max_email size are reversed.
(1, test_mail_server, True, False),
(3, test_mail_server, True, True),
(1, test_mail_server, True, True),
# Attachments not linked to a business record are never turned to link
(3, self.env['ir.mail_server'], False, False),
(1, test_mail_server, False, False),
):
# Setup max email size to check that the right maximum is used (default or mail server one)
if expected_is_links:
max_size_test_succeed = max_size_always_exceed * n_attachment
max_size_test_fail = max_size_never_exceed
else:
max_size_test_succeed = max_size_never_exceed
max_size_test_fail = max_size_always_exceed * n_attachment
if mail_server:
self.env['ir.config_parameter'].sudo().set_param('base.default_max_email_size', max_size_test_fail)
mail_server.max_email_size = max_size_test_succeed
else:
self.env['ir.config_parameter'].sudo().set_param('base.default_max_email_size', max_size_test_succeed)
attachments = self.env['ir.attachment'].sudo().create([{
'name': f'attachment{idx_attachment}',
'res_name': 'test',
'res_model': self.test_record._name if business_attachment else 'mail.message',
'res_id': self.test_record.id if business_attachment else 0,
'datas': 'IA==', # a non-empty base64 content. We mock attachment file_size to simulate bigger size.
} for idx_attachment in range(n_attachment)])
with self.mock_smtplib_connection():
mails = self.env['mail.mail'].create([{
'attachment_ids': attachments.ids,
'body_html': '<p>Test</p>',
'email_from': 'test@test_2.com',
'email_to': f'mail_{mail_idx}@test.com',
} for mail_idx in range(2)])
mails._send(mail_server=mail_server)
self.assertEqual(len(self.emails), 2)
for mail, outgoing_email in zip(mails, self.emails):
message_raw = outgoing_email['message']
message_parsed = message_from_string(message_raw)
message_cleaned = re.sub(r'[\s=]', '', message_raw)
with self.subTest(n_attachment=n_attachment, mail_server=mail_server,
business_attachment=business_attachment, expected_is_links=expected_is_links):
if expected_is_links:
self.assertEqual(count_attachments(message_parsed), 0,
'Attachments should have been removed (replaced by download links)')
self.assertTrue(all(attachment.access_token for attachment in attachments),
'Original attachment should have been modified (access_token added)')
self.assertTrue(all(attachment.access_token in message_cleaned for attachment in attachments),
'All attachments should have been turned into download links')
else:
self.assertEqual(count_attachments(message_parsed), n_attachment,
'All attachments should be present')
self.assertEqual(message_cleaned.count('access_token'), 0,
'Attachments should not have been turned into download links')
self.assertTrue(all(not attachment.access_token for attachment in attachments),
'Original attachment should not have been modified (access_token not added)')
@tagged('mail_mail')
class TestMailMailRace(common.TransactionCase):
@mute_logger('odoo.addons.mail.models.mail_mail')
def test_mail_bounce_during_send(self):
cr = self.registry.cursor()
env = api.Environment(cr, SUPERUSER_ID, {})
self.partner = env['res.partner'].create({
'name': 'Ernest Partner',
})
# we need to simulate a mail sent by the cron task, first create mail, message and notification by hand
mail = env['mail.mail'].sudo().create({
'body_html': '<p>Test</p>',
'is_notification': True,
'state': 'outgoing',
'recipient_ids': [(4, self.partner.id)]
})
mail_message = mail.mail_message_id
message = env['mail.message'].create({
'subject': 'S',
'body': 'B',
'subtype_id': self.ref('mail.mt_comment'),
'notification_ids': [(0, 0, {
'res_partner_id': self.partner.id,
'mail_mail_id': mail.id,
'notification_type': 'email',
'is_read': True,
'notification_status': 'ready',
})],
})
notif = env['mail.notification'].search([('res_partner_id', '=', self.partner.id)])
# we need to commit transaction or cr will keep the lock on notif
cr.commit()
# patch send_email in order to create a concurent update and check the notif is already locked by _send()
this = self # coding in javascript ruinned my life
bounce_deferred = []
@api.model
def send_email(self, message, *args, **kwargs):
with this.registry.cursor() as cr, mute_logger('odoo.sql_db'):
try:
# try ro aquire lock (no wait) on notification (should fail)
cr.execute("SELECT notification_status FROM mail_notification WHERE id = %s FOR UPDATE NOWAIT", [notif.id])
except psycopg2.OperationalError:
# record already locked by send, all good
bounce_deferred.append(True)
else:
# this should trigger psycopg2.extensions.TransactionRollbackError in send().
# Only here to simulate the initial use case
# If the record is lock, this line would create a deadlock since we are in the same thread
# In practice, the update will wait the end of the send() transaction and set the notif as bounce, as expeced
cr.execute("UPDATE mail_notification SET notification_status='bounce' WHERE id = %s", [notif.id])
return message['Message-Id']
self.patch(self.registry['ir.mail_server'], 'send_email', send_email)
mail.send()
self.assertTrue(bounce_deferred, "The bounce should have been deferred")
self.assertEqual(notif.notification_status, 'sent')
# some cleaning since we commited the cr
notif.unlink()
mail.unlink()
(mail_message | message).unlink()
self.partner.unlink()
cr.commit()
cr.close()