2287 lines
123 KiB
Python
2287 lines
123 KiB
Python
# -*- coding: utf-8 -*-
|
||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
||
import base64
|
||
import itertools
|
||
import socket
|
||
|
||
from datetime import datetime
|
||
|
||
from unittest.mock import DEFAULT
|
||
from unittest.mock import patch
|
||
|
||
from odoo import exceptions
|
||
from odoo.addons.mail.models.mail_message import Message
|
||
from odoo.addons.mail.models.mail_thread import MailThread
|
||
from odoo.addons.mail.tests.common import mail_new_test_user, MailCommon
|
||
from odoo.addons.test_mail.data import test_mail_data
|
||
from odoo.addons.test_mail.data.test_mail_data import MAIL_TEMPLATE, THAI_EMAIL_WINDOWS_874
|
||
from odoo.addons.test_mail.models.test_mail_models import MailTestGateway, MailTestGatewayGroups, MailTestTicket
|
||
from odoo.sql_db import Cursor
|
||
from odoo.tests import tagged, RecordCapturer
|
||
from odoo.tools import mute_logger
|
||
from odoo.tools.mail import email_split_and_format, formataddr
|
||
|
||
|
||
@tagged('mail_gateway')
|
||
class TestEmailParsing(MailCommon):
|
||
|
||
def test_message_parse_and_replace_binary_octetstream(self):
|
||
""" Incoming email containing a wrong Content-Type as described in RFC2046/section-3 """
|
||
received_mail = self.from_string(test_mail_data.MAIL_MULTIPART_BINARY_OCTET_STREAM)
|
||
with self.assertLogs('odoo.addons.mail.models.mail_thread', level="WARNING") as capture:
|
||
extracted_mail = self.env['mail.thread']._message_parse_extract_payload(received_mail, {})
|
||
|
||
self.assertEqual(len(extracted_mail['attachments']), 1)
|
||
attachment = extracted_mail['attachments'][0]
|
||
self.assertEqual(attachment.fname, 'hello_world.dat')
|
||
self.assertEqual(attachment.content, b'Hello world\n')
|
||
self.assertEqual(capture.output, [
|
||
("WARNING:odoo.addons.mail.models.mail_thread:Message containing an unexpected "
|
||
"Content-Type 'binary/octet-stream', assuming 'application/octet-stream'"),
|
||
])
|
||
|
||
def test_message_parse_body(self):
|
||
# test pure plaintext
|
||
plaintext = self.format(test_mail_data.MAIL_TEMPLATE_PLAINTEXT, email_from='"Sylvie Lelitre" <test.sylvie.lelitre@agrolait.com>')
|
||
res = self.env['mail.thread'].message_parse(self.from_string(plaintext))
|
||
self.assertIn('Please call me as soon as possible this afternoon!', res['body'])
|
||
|
||
# test pure html
|
||
html = self.format(test_mail_data.MAIL_TEMPLATE_HTML, email_from='"Sylvie Lelitre" <test.sylvie.lelitre@agrolait.com>')
|
||
res = self.env['mail.thread'].message_parse(self.from_string(html))
|
||
self.assertIn('<p>Please call me as soon as possible this afternoon!</p>', res['body'])
|
||
self.assertNotIn('<!DOCTYPE', res['body'])
|
||
|
||
# test multipart / text and html -> html has priority
|
||
multipart = self.format(MAIL_TEMPLATE, email_from='"Sylvie Lelitre" <test.sylvie.lelitre@agrolait.com>')
|
||
res = self.env['mail.thread'].message_parse(self.from_string(multipart))
|
||
self.assertIn('<p>Please call me as soon as possible this afternoon!</p>', res['body'])
|
||
|
||
# test multipart / mixed
|
||
res = self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_MULTIPART_MIXED))
|
||
self.assertNotIn(
|
||
'Should create a multipart/mixed: from gmail, *bold*, with attachment', res['body'],
|
||
'message_parse: text version should not be in body after parsing multipart/mixed')
|
||
self.assertIn(
|
||
'<div dir="ltr">Should create a multipart/mixed: from gmail, <b>bold</b>, with attachment.<br clear="all"><div><br></div>', res['body'],
|
||
'message_parse: html version should be in body after parsing multipart/mixed')
|
||
|
||
res = self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_MULTIPART_MIXED_TWO))
|
||
self.assertNotIn('First and second part', res['body'],
|
||
'message_parse: text version should not be in body after parsing multipart/mixed')
|
||
self.assertIn('First part', res['body'],
|
||
'message_parse: first part of the html version should be in body after parsing multipart/mixed')
|
||
self.assertIn('Second part', res['body'],
|
||
'message_parse: second part of the html version should be in body after parsing multipart/mixed')
|
||
|
||
res = self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_SINGLE_BINARY))
|
||
self.assertEqual(res['body'], '')
|
||
self.assertEqual(res['attachments'][0][0], 'thetruth.pdf')
|
||
|
||
res = self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_FORWARDED))
|
||
self.assertIn(res['recipients'], ['lucie@petitebedaine.fr,raoul@grosbedon.fr', 'raoul@grosbedon.fr,lucie@petitebedaine.fr'])
|
||
|
||
res = self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_MULTIPART_WEIRD_FILENAME))
|
||
self.assertEqual(res['attachments'][0][0], '62_@;,][)=.(ÇÀÉ.txt')
|
||
|
||
def test_message_parse_attachment_pdf_nonstandard_mime(self):
|
||
# This test checks if aliasing content-type (mime type) of "pdf" with "application/pdf" works correctly. (i.e. Treat "pdf" as "application/pdf")
|
||
|
||
# Baseline check. Parsing mail with "application/pdf"
|
||
mail_with_standard_mime = self.format(test_mail_data.MAIL_PDF_MIME_TEMPLATE, pdf_mime="application/pdf")
|
||
res_std = self.env['mail.thread'].message_parse(self.from_string(mail_with_standard_mime))
|
||
self.assertEqual(res_std['attachments'][0].content, test_mail_data.PDF_PARSED, "Attachment with Content-Type: application/pdf must parse without error")
|
||
|
||
# Parsing the same email, but with content-type set to "pdf"
|
||
mail_with_aliased_mime = self.format(test_mail_data.MAIL_PDF_MIME_TEMPLATE, pdf_mime="pdf")
|
||
res_alias = self.env['mail.thread'].message_parse(self.from_string(mail_with_aliased_mime))
|
||
self.assertEqual(res_alias['attachments'][0].content, test_mail_data.PDF_PARSED, "Attachment with aliased Content-Type: pdf must parse without error")
|
||
|
||
def test_message_parse_bugs(self):
|
||
""" Various corner cases or message parsing """
|
||
# message without Final-Recipient
|
||
self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_NO_FINAL_RECIPIENT))
|
||
|
||
# message with empty body (including only void characters)
|
||
res = self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_NO_BODY))
|
||
self.assertEqual(res['body'], '\n \n', 'Gateway should not crash with void content')
|
||
|
||
def test_message_parse_eml(self):
|
||
# Test that the parsing of mail with embedded emails as eml(msg) which generates empty attachments, can be processed.
|
||
mail = self.format(test_mail_data.MAIL_EML_ATTACHMENT, email_from='"Sylvie Lelitre" <test.sylvie.lelitre@agrolait.com>', to=f'generic@{self.alias_domain}')
|
||
self.env['mail.thread'].message_parse(self.from_string(mail))
|
||
|
||
def test_message_parse_eml_bounce_headers(self):
|
||
# Test Text/RFC822-Headers MIME content-type
|
||
msg_id = '<861878175823148.1577183525.736005783081055-openerp-19177-account.invoice@mycompany.example.com>'
|
||
mail = self.format(
|
||
test_mail_data.MAIL_EML_ATTACHMENT_BOUNCE_HEADERS,
|
||
email_from='MAILER-DAEMON@example.com (Mail Delivery System)',
|
||
to='test_bounce+82240-account.invoice-19177@mycompany.example.com',
|
||
# msg_id goes to the attachment's Message-Id header
|
||
msg_id=msg_id,
|
||
)
|
||
res = self.env['mail.thread'].message_parse(self.from_string(mail))
|
||
|
||
self.assertEqual(res['bounced_msg_ids'], [msg_id], "Message-Id is not extracted from Text/RFC822-Headers attachment")
|
||
|
||
def test_message_parse_extract_bounce_rfc822_headers_qp(self):
|
||
# Incoming bounce for unexisting Outlook address
|
||
# bounce back sometimes with a Content-Type `text/rfc822-headers`
|
||
# and Content-Type-Encoding `quoted-printable`
|
||
partner = self.env['res.partner'].create({
|
||
'name':'Mitchelle Admine',
|
||
'email':'rdesfrdgtfdrfesd@outlook.com'
|
||
})
|
||
message = self.env['mail.message'].create({
|
||
'message_id' : '<368396033905967.1673346177.695352554321289-openerp-11-sale.order@eupp00>'
|
||
})
|
||
incoming_bounce = self.format(
|
||
test_mail_data.MAIL_BOUNCE_QP_RFC822_HEADERS,
|
||
email_from='MAILER-DAEMON@mailserver.odoo.com (Mail Delivery System)',
|
||
email_to='bounce@xxx.odoo.com',
|
||
delivered_to='bounce@xxx.odoo.com'
|
||
)
|
||
msg = self.env['mail.thread'].message_parse(self.from_string(incoming_bounce))
|
||
self.assertEqual(msg['bounced_email'], partner.email, "The sender email should be correctly parsed")
|
||
self.assertEqual(msg['bounced_partner'], partner, "A partner with this email should exist")
|
||
self.assertEqual(msg['bounced_msg_ids'][0], message.message_id, "The sender message-id should correctly parsed")
|
||
self.assertEqual(msg['bounced_message'], message, "An existing message with this message_id should exist")
|
||
|
||
def test_message_parse_plaintext(self):
|
||
""" Incoming email in plaintext should be stored as html """
|
||
mail = self.format(test_mail_data.MAIL_TEMPLATE_PLAINTEXT, email_from='"Sylvie Lelitre" <test.sylvie.lelitre@agrolait.com>', to=f'generic@{self.alias_domain}')
|
||
res = self.env['mail.thread'].message_parse(self.from_string(mail))
|
||
self.assertIn('<pre>\nPlease call me as soon as possible this afternoon!\n\n--\nSylvie\n</pre>', res['body'])
|
||
|
||
def test_message_parse_xhtml(self):
|
||
# Test that the parsing of XHTML mails does not fail
|
||
self.env['mail.thread'].message_parse(self.from_string(test_mail_data.MAIL_XHTML))
|
||
|
||
|
||
@tagged('mail_gateway')
|
||
class MailGatewayCommon(MailCommon):
|
||
|
||
@classmethod
|
||
def setUpClass(cls):
|
||
super().setUpClass()
|
||
cls.mail_test_gateway_model = cls.env['ir.model']._get('mail.test.gateway')
|
||
cls.mail_test_gateway_company_model = cls.env['ir.model']._get('mail.test.gateway.company')
|
||
cls.email_from = '"Sylvie Lelitre" <test.sylvie.lelitre@agrolait.com>'
|
||
|
||
cls.test_record = cls.env['mail.test.gateway'].with_context(mail_create_nolog=True).create({
|
||
'name': 'Test',
|
||
'email_from': 'ignasse@example.com',
|
||
})
|
||
|
||
cls.partner_1 = cls.env['res.partner'].create({
|
||
'name': 'Valid Lelitre',
|
||
'email': 'valid.lelitre@agrolait.com',
|
||
})
|
||
# groups@test.mycompany.com will cause the creation of new mail.test.gateway
|
||
cls.alias = cls.env['mail.alias'].create({
|
||
'alias_domain_id': cls.mail_alias_domain.id,
|
||
'alias_contact': 'everyone',
|
||
'alias_model_id': cls.mail_test_gateway_model.id,
|
||
'alias_name': 'groups',
|
||
})
|
||
# groups@test.mycompany2.com will cause the creation of new mail.test.gateway.company
|
||
cls.alias_c2 = cls.env['mail.alias'].create({
|
||
'alias_defaults': {
|
||
'company_id': cls.company_2.id,
|
||
},
|
||
'alias_domain_id': cls.mail_alias_domain_c2.id,
|
||
'alias_contact': 'everyone',
|
||
'alias_model_id': cls.mail_test_gateway_company_model.id,
|
||
'alias_name': 'groups',
|
||
})
|
||
|
||
# Set a first message on public group to test update and hierarchy
|
||
cls.fake_email = cls._create_gateway_message(cls.test_record, '123456')
|
||
|
||
def _reinject(self, force_msg_id=False, debug_log=False):
|
||
""" Tool to automatically 'inject' an outgoing mail into the gateway.
|
||
Content changes.
|
||
|
||
:param str force_msg_id: allow to change the msg_id to simulate stupid
|
||
email providers that change message IDs;
|
||
"""
|
||
self.assertEqual(len(self._mails), 1)
|
||
mail = self._mails[0]
|
||
extra = f'References: {mail["references"]}'
|
||
if mail["headers"].get("X-Odoo-Message-Id"):
|
||
extra += f'\nX-Odoo-Message-Id: {mail["headers"]["X-Odoo-Message-Id"]}'
|
||
with self.mock_mail_gateway(), self.mock_mail_app():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, mail['email_from'], ','.join(mail['email_to']),
|
||
msg_id=force_msg_id or mail['message_id'], extra=extra,
|
||
debug_log=debug_log,
|
||
)
|
||
|
||
@classmethod
|
||
def _create_gateway_message(cls, record, msg_id_prefix, **values):
|
||
msg_values = {
|
||
'author_id': cls.partner_1.id,
|
||
'email_from': cls.partner_1.email_formatted,
|
||
'body': '<p>Generic body</p>',
|
||
'message_id': f'<{msg_id_prefix}-openerp-{record.id}-{record._name}@{socket.gethostname()}>',
|
||
'message_type': 'email',
|
||
'model': record._name,
|
||
'res_id': record.id,
|
||
'subject': 'Generic Message',
|
||
'subtype_id': cls.env.ref('mail.mt_comment').id,
|
||
}
|
||
msg_values.update(**values)
|
||
return cls.env['mail.message'].create(msg_values)
|
||
|
||
|
||
@tagged('mail_gateway')
|
||
class TestMailgateway(MailGatewayCommon):
|
||
|
||
def test_assert_initial_values(self):
|
||
""" Just some basics checks to ensure tests coherency """
|
||
self.assertEqual(len(self.test_record.message_ids), 1)
|
||
|
||
# --------------------------------------------------
|
||
# Base low-level tests
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_alias_basic(self):
|
||
""" Test details of created message going through mailgateway """
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Specific')
|
||
|
||
# Test: one group created by mailgateway administrator as user_id is not set
|
||
self.assertEqual(len(record), 1, 'message_process: a new mail.test should have been created')
|
||
res = record.get_metadata()[0].get('create_uid') or [None]
|
||
self.assertEqual(res[0], self.env.uid)
|
||
|
||
# Test: one message that is the incoming email
|
||
self.assertEqual(len(record.message_ids), 1)
|
||
msg = record.message_ids[0]
|
||
self.assertEqual(msg.subject, 'Specific')
|
||
self.assertIn('Please call me as soon as possible this afternoon!', msg.body)
|
||
self.assertEqual(msg.message_type, 'email')
|
||
self.assertEqual(msg.subtype_id, self.env.ref('mail.mt_comment'))
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_cid(self):
|
||
origin_message_parse_extract_payload = MailThread._message_parse_extract_payload
|
||
|
||
def _message_parse_extract_payload(this, *args, **kwargs):
|
||
res = origin_message_parse_extract_payload(this, *args, **kwargs)
|
||
self.assertTrue(isinstance(res['body'], str), 'Body from extracted payload should still be a string.')
|
||
return res
|
||
|
||
with patch.object(MailThread, '_message_parse_extract_payload', _message_parse_extract_payload):
|
||
record = self.format_and_process(test_mail_data.MAIL_MULTIPART_IMAGE, self.email_from, f'groups@{self.alias_domain}')
|
||
message = record.message_ids[0]
|
||
for attachment in message.attachment_ids:
|
||
self.assertIn(f'/web/image/{attachment.id}', message.body)
|
||
self.assertEqual(
|
||
set(message.attachment_ids.mapped('name')),
|
||
set(['rosaçée.gif', 'verte!µ.gif', 'orangée.gif']))
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_followers(self):
|
||
""" Incoming email: recognized author not archived and not odoobot:
|
||
added as follower. Also test corner cases: archived. """
|
||
partner_archived = self.env['res.partner'].create({
|
||
'active': False,
|
||
'email': 'archived.customer@text.example.com',
|
||
'phone': '0032455112233',
|
||
'name': 'Archived Customer',
|
||
'type': 'contact',
|
||
})
|
||
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email_formatted, f'groups@{self.alias_domain}')
|
||
|
||
self.assertEqual(record.message_ids[0].author_id, self.partner_1,
|
||
'message_process: recognized email -> author_id')
|
||
self.assertEqual(record.message_ids[0].email_from, self.partner_1.email_formatted)
|
||
self.assertEqual(record.message_follower_ids.partner_id, self.partner_1,
|
||
'message_process: recognized email -> added as follower')
|
||
self.assertEqual(record.message_partner_ids, self.partner_1,
|
||
'message_process: recognized email -> added as follower')
|
||
|
||
# just an email -> no follower
|
||
with self.mock_mail_gateway():
|
||
record2 = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Another Email')
|
||
|
||
self.assertEqual(record2.message_ids[0].author_id, self.env['res.partner'])
|
||
self.assertEqual(record2.message_ids[0].email_from, self.email_from)
|
||
self.assertEqual(record2.message_follower_ids.partner_id, self.env['res.partner'],
|
||
'message_process: unrecognized email -> no follower')
|
||
self.assertEqual(record2.message_partner_ids, self.env['res.partner'],
|
||
'message_process: unrecognized email -> no follower')
|
||
|
||
# archived partner -> no follower
|
||
with self.mock_mail_gateway():
|
||
record3 = self.format_and_process(
|
||
MAIL_TEMPLATE, partner_archived.email_formatted, f'groups@{self.alias_domain}',
|
||
subject='Archived Partner')
|
||
|
||
self.assertEqual(record3.message_ids[0].author_id, self.env['res.partner'])
|
||
self.assertEqual(record3.message_ids[0].email_from, partner_archived.email_formatted)
|
||
self.assertEqual(record3.message_follower_ids.partner_id, self.env['res.partner'],
|
||
'message_process: archived partner -> no follower')
|
||
self.assertEqual(record3.message_partner_ids, self.env['res.partner'],
|
||
'message_process: archived partner -> no follower')
|
||
|
||
# partner_root -> never again
|
||
odoobot = self.env.ref('base.partner_root')
|
||
odoobot.active = True
|
||
odoobot.email = 'odoobot@example.com'
|
||
with self.mock_mail_gateway():
|
||
record4 = self.format_and_process(
|
||
MAIL_TEMPLATE, odoobot.email_formatted, f'groups@{self.alias_domain}',
|
||
subject='Odoobot Automatic Answer')
|
||
|
||
self.assertEqual(record4.message_ids[0].author_id, odoobot)
|
||
self.assertEqual(record4.message_ids[0].email_from, odoobot.email_formatted)
|
||
self.assertEqual(record4.message_follower_ids.partner_id, self.env['res.partner'],
|
||
'message_process: odoobot -> no follower')
|
||
self.assertEqual(record4.message_partner_ids, self.env['res.partner'],
|
||
'message_process: odoobot -> no follower')
|
||
|
||
# --------------------------------------------------
|
||
# Author recognition
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_email_email_from(self):
|
||
""" Incoming email: not recognized author: email_from, no author_id, no followers """
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}')
|
||
self.assertFalse(record.message_ids[0].author_id, 'message_process: unrecognized email -> no author_id')
|
||
self.assertEqual(record.message_ids[0].email_from, self.email_from)
|
||
self.assertEqual(len(record.message_partner_ids), 0,
|
||
'message_process: newly create group should not have any follower')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_email_author(self):
|
||
""" Incoming email: recognized author: email_from, author_id, added as follower """
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email_formatted, f'groups@{self.alias_domain}', subject='Test1')
|
||
|
||
self.assertEqual(record.message_ids[0].author_id, self.partner_1,
|
||
'message_process: recognized email -> author_id')
|
||
self.assertEqual(record.message_ids[0].email_from, self.partner_1.email_formatted)
|
||
self.assertNotSentEmail() # No notification / bounce should be sent
|
||
|
||
# Email recognized if partner has a formatted email
|
||
self.partner_1.write({'email': f'"Valid Lelitre" <{self.partner_1.email}>'})
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email, f'groups@{self.alias_domain}', subject='Test2')
|
||
|
||
self.assertEqual(record.message_ids[0].author_id, self.partner_1,
|
||
'message_process: recognized email -> author_id')
|
||
self.assertEqual(record.message_ids[0].email_from, self.partner_1.email)
|
||
self.assertNotSentEmail() # No notification / bounce should be sent
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_email_author_multiemail(self):
|
||
""" Incoming email: recognized author: check multi/formatted email in field """
|
||
test_email = 'valid.lelitre@agrolait.com'
|
||
# Email not recognized if partner has a multi-email (source = formatted email)
|
||
self.partner_1.write({'email': f'{test_email}, "Valid Lelitre" <another.email@test.example.com>'})
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, f'"Valid Lelitre" <{test_email}>', f'groups@{self.alias_domain}', subject='Test3')
|
||
|
||
self.assertEqual(record.message_ids[0].author_id, self.partner_1,
|
||
'message_process: found author based on first found email normalized, even with multi emails')
|
||
self.assertEqual(record.message_ids[0].email_from, f'"Valid Lelitre" <{test_email}>')
|
||
self.assertNotSentEmail() # No notification / bounce should be sent
|
||
|
||
# Email not recognized if partner has a multi-email (source = std email)
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, test_email, f'groups@{self.alias_domain}', subject='Test4')
|
||
|
||
self.assertEqual(record.message_ids[0].author_id, self.partner_1,
|
||
'message_process: found author based on first found email normalized, even with multi emails')
|
||
self.assertEqual(record.message_ids[0].email_from, test_email)
|
||
self.assertNotSentEmail() # No notification / bounce should be sent
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.tests')
|
||
def test_message_process_email_partner_find(self):
|
||
""" Finding the partner based on email, based on partner / user / follower """
|
||
self.alias.write({'alias_force_thread_id': self.test_record.id})
|
||
from_1 = self.env['res.partner'].create({'name': 'Brice Denisse', 'email': 'from.test@example.com'})
|
||
|
||
self.format_and_process(MAIL_TEMPLATE, from_1.email_formatted, f'groups@{self.alias_domain}')
|
||
self.assertEqual(self.test_record.message_ids[0].author_id, from_1)
|
||
self.test_record.message_unsubscribe([from_1.id])
|
||
|
||
from_2 = mail_new_test_user(self.env, login='B', groups='base.group_user', name='User Denisse', email='from.test@example.com')
|
||
|
||
self.format_and_process(MAIL_TEMPLATE, from_1.email_formatted, f'groups@{self.alias_domain}')
|
||
self.assertEqual(self.test_record.message_ids[0].author_id, from_2.partner_id)
|
||
self.test_record.message_unsubscribe([from_2.partner_id.id])
|
||
|
||
from_3 = self.env['res.partner'].create({'name': 'FOllower Denisse', 'email': 'from.test@example.com'})
|
||
self.test_record.message_subscribe([from_3.id])
|
||
|
||
self.format_and_process(MAIL_TEMPLATE, from_1.email_formatted, f'groups@{self.alias_domain}')
|
||
self.assertEqual(self.test_record.message_ids[0].author_id, from_3)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_email_author_exclude_alias(self):
|
||
""" Do not set alias as author to avoid including aliases in discussions """
|
||
from_1 = self.env['res.partner'].create({
|
||
'name': 'Brice Denisse',
|
||
'email': f'from.test@{self.mail_alias_domain.name}',
|
||
})
|
||
self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'from.test',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.gateway').id
|
||
})
|
||
|
||
record = self.format_and_process(MAIL_TEMPLATE, from_1.email_formatted, f'groups@{self.alias_domain}')
|
||
self.assertFalse(record.message_ids[0].author_id)
|
||
self.assertEqual(record.message_ids[0].email_from, from_1.email_formatted)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_owner_author_notify(self):
|
||
""" Make sure users are notified when a reply is sent to an alias address.
|
||
Alias owner should impact the message creator, but not notifications. """
|
||
test_record = self.env['mail.test.ticket'].create({})
|
||
author_partner = self.env['res.partner'].create({
|
||
'name': 'Author',
|
||
'email': f'author-partner@{self.alias_domain}',
|
||
})
|
||
message = self.env['mail.message'].create({
|
||
'body': '<p>test</p>',
|
||
'email_from': f'author-partner@{self.alias_domain}', # email sent by author who also has an alias with their email
|
||
'message_type': 'email_outgoing',
|
||
'model': test_record._name,
|
||
'res_id': test_record.id,
|
||
})
|
||
self.env['mail.alias'].create({
|
||
'alias_model_id': self.env['ir.model']._get_id(test_record._name),
|
||
'alias_name': 'author-partner',
|
||
})
|
||
|
||
test_record.message_subscribe((author_partner | self.user_employee.partner_id).ids)
|
||
|
||
messages = test_record.message_ids
|
||
|
||
self.assertFalse(self.user_root.active, 'notification logic relies on odoobot being archived')
|
||
|
||
test_users = [self.user_employee, self.user_root]
|
||
email_tos = [f'author-partner@{self.alias_domain}', f'some_non_aliased_email@{self.alias_domain}']
|
||
for email_to, test_user in itertools.product(email_tos, test_users):
|
||
with self.subTest(test_user=test_user, email_to=email_to):
|
||
with self.mock_mail_gateway(), self.mock_mail_app():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, email_to,
|
||
subject=message.message_id, extra=f'In-Reply-To:\r\n\t{message.message_id}\n',
|
||
model=None, with_user=test_user)
|
||
new_messages = test_record.message_ids - messages
|
||
|
||
self.assertEqual(len(new_messages), 1)
|
||
self.assertEqual(new_messages.create_uid, self.user_root,
|
||
'Odoobot should be creating the message')
|
||
|
||
# Make sure the alias owner is notified if they are a follower
|
||
self.assertNotified(new_messages, [{
|
||
'partner': self.user_employee.partner_id,
|
||
'is_read': False,
|
||
'type': 'inbox',
|
||
}])
|
||
# never notify the author of the incoming message
|
||
with self.assertRaises(Exception):
|
||
self.assertNotified(new_messages, [{'partner': author_partner}])
|
||
|
||
messages = test_record.message_ids
|
||
|
||
# --------------------------------------------------
|
||
# Alias configuration
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail')
|
||
def test_message_process_alias_config_bounced_content(self):
|
||
""" Custom bounced message for the alias => Received this custom message """
|
||
self.alias.write({
|
||
'alias_contact': 'partners',
|
||
'alias_bounced_content': '<p>What Is Dead May Never Die</p>'
|
||
})
|
||
|
||
# Test: custom bounced content
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Should Bounce')
|
||
self.assertFalse(record, 'message_process: should have bounced')
|
||
self.assertSentEmail(f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>', ['whatever-2a840@postmaster.twitter.com'], body_content='<p>What Is Dead May Never Die</p>')
|
||
|
||
for empty_content in [
|
||
'<p><br></p>', '<p><br> </p>', '<p><br /></p >',
|
||
'<p style="margin: 4px"></p>',
|
||
'<div style="margin: 4px"></div>',
|
||
'<p class="oe_testing"><br></p>',
|
||
'<p><span style="font-weight: bolder;"><font style="color: rgb(255, 0, 0);" class=" "></font></span><br></p>',
|
||
]:
|
||
self.alias.write({
|
||
'alias_contact': 'partners',
|
||
'alias_bounced_content': empty_content,
|
||
})
|
||
|
||
# Test: with "empty" bounced content (simulate view, putting always '<p></br></p>' in html field)
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Should Bounce')
|
||
self.assertFalse(record, 'message_process: should have bounced')
|
||
# Check if default (hardcoded) value is in the mail content
|
||
self.assertSentEmail(
|
||
f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
['whatever-2a840@postmaster.twitter.com'],
|
||
body_content=f'<p>Dear Sender,<br /><br />The message below could not be accepted by the address {self.alias.display_name.lower()}',
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.addons.mail.models.mail_mail', 'odoo.models.unlink')
|
||
def test_message_process_alias_config_bounced_to(self):
|
||
""" Check bounce message contains the bouncing alias, not a generic "to" """
|
||
self.alias.write({'alias_contact': 'partners'})
|
||
bounce_message_with_alias = f'<p>Dear Sender,<br /><br />The message below could not be accepted by the address {self.alias.display_name.lower()}'
|
||
|
||
# Bounce is To
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
cc='other@gmail.com', subject='Should Bounce')
|
||
self.assertIn(bounce_message_with_alias, self._mails[0].get('body'))
|
||
|
||
# Bounce is CC
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, 'other@gmail.com',
|
||
cc=f'groups@{self.alias_domain}', subject='Should Bounce')
|
||
self.assertIn(bounce_message_with_alias, self._mails[0].get('body'))
|
||
|
||
# Bounce is part of To
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'other@gmail.com, groups@{self.alias_domain}',
|
||
subject='Should Bounce')
|
||
self.assertIn(bounce_message_with_alias, self._mails[0].get('body'))
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.addons.mail.models.mail_mail', 'odoo.models', 'odoo.sql_db')
|
||
def test_message_process_alias_config_invalid_defaults(self):
|
||
"""Sending a mail to a misconfigured alias must change its status to
|
||
invalid and notify sender and alias creator."""
|
||
test_model_track = self.env['ir.model']._get('mail.test.track')
|
||
container_custom = self.env['mail.test.container'].create({})
|
||
alias_valid = self.env['mail.alias'].with_user(self.user_admin).create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'valid',
|
||
'alias_model_id': test_model_track.id,
|
||
'alias_contact': 'everyone',
|
||
'alias_defaults': f"{{'container_id': {container_custom.id}}}",
|
||
})
|
||
self.assertEqual(alias_valid.create_uid, self.user_admin)
|
||
|
||
# Test that it works when the reference to container_id in alias default is not dangling.
|
||
self.assertEqual(alias_valid.alias_status, 'not_tested')
|
||
with self.mock_mail_gateway(), patch('odoo.addons.mail.models.mail_alias.Alias._alias_bounce_incoming_email',
|
||
autospec=True) as _alias_bounce_incoming_email_mock:
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'valid@{self.alias_domain}', subject='Valid',
|
||
target_model=test_model_track.model)
|
||
_alias_bounce_incoming_email_mock.assert_not_called()
|
||
self.assertNotSentEmail()
|
||
self.assertEqual(record.container_id, container_custom)
|
||
self.assertEqual(alias_valid.alias_status, 'valid')
|
||
|
||
# Test with a dangling reference that must trigger bounce emails and set the alias status to invalid.
|
||
container_custom.unlink()
|
||
with self.assertRaises(Exception), patch('odoo.addons.mail.models.mail_alias.Alias._alias_bounce_incoming_email',
|
||
autospec=True) as _alias_bounce_incoming_email_mock:
|
||
self.format_and_process(MAIL_TEMPLATE, self.email_from, f'valid@{self.alias_domain}', subject='Invalid',
|
||
target_model=test_model_track.model)
|
||
|
||
# method executed in another transaction, so we cannot test its result directly but just below
|
||
_alias_bounce_incoming_email_mock.assert_called_once()
|
||
|
||
# call notify_alias_invalid on the test transaction to validate its effect
|
||
alias, message, message_dict = _alias_bounce_incoming_email_mock.call_args.args
|
||
with self.mock_mail_gateway():
|
||
alias = self.env['mail.alias'].browse(alias.id) # load alias in test transaction
|
||
alias._alias_bounce_incoming_email(message, message_dict)
|
||
|
||
self.assertEqual(alias_valid.alias_status, 'invalid')
|
||
self.assertSentEmail(f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
[self.user_admin.email_formatted],
|
||
subject='Re: Invalid')
|
||
# Not sent to self.email_from because a return path is present in MAIL_TEMPLATE
|
||
self.assertSentEmail(f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
['whatever-2a840@postmaster.twitter.com'],
|
||
subject='Re: Invalid',
|
||
body=alias_valid._get_alias_invalid_body(message_dict))
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_alias_defaults(self):
|
||
""" Test alias defaults and inner values """
|
||
self.alias.write({
|
||
'alias_defaults': "{'custom_field': 'defaults_custom'}"
|
||
})
|
||
self.assertEqual(self.alias.alias_status, 'not_tested')
|
||
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Specific'
|
||
)
|
||
self.assertEqual(self.alias.alias_status, 'valid')
|
||
self.assertEqual(len(record), 1)
|
||
self.assertEqual(record.name, 'Specific')
|
||
self.assertEqual(record.custom_field, 'defaults_custom')
|
||
|
||
self.alias.write({'alias_defaults': '""'})
|
||
self.assertEqual(self.alias.alias_status, 'not_tested', 'Updating alias_defaults must reset status')
|
||
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Specific2'
|
||
)
|
||
self.assertEqual(len(record), 1)
|
||
self.assertEqual(record.name, 'Specific2')
|
||
self.assertFalse(record.custom_field)
|
||
self.assertEqual(self.alias.alias_status, 'valid')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_alias_everyone(self):
|
||
""" Incoming email: everyone: new record + message_new """
|
||
self.alias.write({'alias_contact': 'everyone'})
|
||
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Specific')
|
||
self.assertEqual(len(record), 1)
|
||
self.assertEqual(len(record.message_ids), 1)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail')
|
||
def test_message_process_alias_partners_bounce(self):
|
||
""" Incoming email from an unknown partner on a Partners only alias -> bounce + test bounce email """
|
||
self.alias.write({'alias_contact': 'partners'})
|
||
|
||
# Test: no group created, email bounced
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Should Bounce')
|
||
self.assertFalse(record)
|
||
self.assertSentEmail(f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>', ['whatever-2a840@postmaster.twitter.com'], subject='Re: Should Bounce')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail')
|
||
def test_message_process_alias_followers_bounce(self):
|
||
""" Incoming email from unknown partner / not follower partner on a Followers only alias -> bounce """
|
||
self.alias.write({
|
||
'alias_contact': 'followers',
|
||
'alias_parent_model_id': self.env['ir.model']._get('mail.test.gateway').id,
|
||
'alias_parent_thread_id': self.test_record.id,
|
||
})
|
||
|
||
# Test: unknown on followers alias -> bounce
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Should Bounce')
|
||
self.assertFalse(record, 'message_process: should have bounced')
|
||
self.assertSentEmail(
|
||
f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
['whatever-2a840@postmaster.twitter.com'],
|
||
subject='Re: Should Bounce'
|
||
)
|
||
|
||
# Test: partner on followers alias -> bounce
|
||
self._init_mail_mock()
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email_formatted, f'groups@{self.alias_domain}', subject='Should Bounce')
|
||
self.assertFalse(record, 'message_process: should have bounced')
|
||
self.assertSentEmail(
|
||
f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
['whatever-2a840@postmaster.twitter.com'],
|
||
subject='Re: Should Bounce'
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_alias_partner(self):
|
||
""" Incoming email from a known partner on a Partners alias -> ok (+ test on alias.user_id) """
|
||
self.alias.write({'alias_contact': 'partners'})
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email_formatted, f'groups@{self.alias_domain}')
|
||
|
||
# Test: one group created by alias user
|
||
self.assertEqual(len(record), 1)
|
||
self.assertEqual(len(record.message_ids), 1)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_alias_followers(self):
|
||
""" Incoming email from a parent document follower on a Followers only alias -> ok """
|
||
self.alias.write({
|
||
'alias_contact': 'followers',
|
||
'alias_parent_model_id': self.env['ir.model']._get('mail.test.gateway').id,
|
||
'alias_parent_thread_id': self.test_record.id,
|
||
})
|
||
self.test_record.message_subscribe(partner_ids=[self.partner_1.id])
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email_formatted, f'groups@{self.alias_domain}')
|
||
|
||
# Test: one group created by Raoul (or Sylvie maybe, if we implement it)
|
||
self.assertEqual(len(record), 1)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models', 'odoo.tests')
|
||
def test_message_process_alias_followers_multiemail(self):
|
||
""" Incoming email from a parent document follower on a Followers only
|
||
alias depends on email_from / partner recognition, to be tested when
|
||
dealing with multi emails / formatted emails. """
|
||
self.alias.write({
|
||
'alias_contact': 'followers',
|
||
'alias_parent_model_id': self.env['ir.model']._get('mail.test.gateway').id,
|
||
'alias_parent_thread_id': self.test_record.id,
|
||
})
|
||
self.test_record.message_subscribe(partner_ids=[self.partner_1.id])
|
||
email_from = formataddr(("Another Name", self.partner_1.email_normalized))
|
||
|
||
for partner_email, passed in [
|
||
(formataddr((self.partner_1.name, self.partner_1.email_normalized)), True),
|
||
(f'{self.partner_1.email_normalized}, "Multi Email" <multi.email@test.example.com>', True),
|
||
(f'"Multi Email" <multi.email@test.example.com>, {self.partner_1.email_normalized}', False),
|
||
]:
|
||
with self.subTest(partner_email=partner_email):
|
||
self.partner_1.write({'email': partner_email})
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, email_from, f'groups@{self.alias_domain}',
|
||
subject=f'Test for {partner_email}')
|
||
|
||
if passed:
|
||
self.assertEqual(len(record), 1)
|
||
self.assertEqual(record.email_from, email_from)
|
||
self.assertEqual(record.message_partner_ids, self.partner_1)
|
||
# multi emails not recognized (no normalized email, recognition)
|
||
else:
|
||
self.assertEqual(len(record), 0,
|
||
'Alias check (FIXME): multi-emails bad support for recognition')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail')
|
||
def test_message_process_alias_update(self):
|
||
""" Incoming email update discussion + notification email """
|
||
self.alias.write({'alias_force_thread_id': self.test_record.id})
|
||
|
||
self.test_record.message_subscribe(partner_ids=[self.partner_1.id])
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
msg_id='<1198923581.41972151344608186799.JavaMail.diff1@agrolait.com>', subject='Re: cats')
|
||
|
||
# Test: no new group + new message
|
||
self.assertFalse(record, 'message_process: alias update should not create new records')
|
||
self.assertEqual(len(self.test_record.message_ids), 2)
|
||
# Test: sent emails: 1 (Sylvie copy of the incoming email)
|
||
self.assertSentEmail(self.email_from, [self.partner_1], subject='Re: cats')
|
||
|
||
# --------------------------------------------------
|
||
# Creator recognition
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_create_uid_crash(self):
|
||
def _employee_crash(records, operation):
|
||
""" If employee is test employee, consider they have no access on document """
|
||
if records.env.uid == self.user_employee.id and not records.env.su:
|
||
return lambda: exceptions.AccessError('Hop hop hop Ernest, please step back.'), records
|
||
return DEFAULT
|
||
|
||
with patch.object(MailTestGateway, 'check_access', autospec=True, side_effect=_employee_crash):
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.user_employee.email_formatted, f'groups@{self.alias_domain}', subject='NoEmployeeAllowed')
|
||
self.assertEqual(record.create_uid, self.user_employee)
|
||
self.assertEqual(record.message_ids[0].subject, 'NoEmployeeAllowed')
|
||
self.assertEqual(record.message_ids[0].create_uid, self.user_root, 'Message should be created by caller of message_process.')
|
||
self.assertEqual(record.message_ids[0].author_id, self.user_employee.partner_id)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_create_uid_email(self):
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.user_employee.email_formatted, f'groups@{self.alias_domain}', subject='Email Found')
|
||
self.assertEqual(record.create_uid, self.user_employee)
|
||
self.assertEqual(record.message_ids[0].subject, 'Email Found')
|
||
self.assertEqual(record.message_ids[0].create_uid, self.user_root)
|
||
self.assertEqual(record.message_ids[0].author_id, self.user_employee.partner_id)
|
||
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, f'Another name <{self.user_employee.email}>',
|
||
f'groups@{self.alias_domain}',
|
||
subject='Email OtherName')
|
||
self.assertEqual(record.create_uid, self.user_employee)
|
||
self.assertEqual(record.message_ids[0].subject, 'Email OtherName')
|
||
self.assertEqual(record.message_ids[0].create_uid, self.user_root)
|
||
self.assertEqual(record.message_ids[0].author_id, self.user_employee.partner_id)
|
||
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.user_employee.email_normalized, f'groups@{self.alias_domain}', subject='Email SimpleEmail')
|
||
self.assertEqual(record.create_uid, self.user_employee)
|
||
self.assertEqual(record.message_ids[0].subject, 'Email SimpleEmail')
|
||
self.assertEqual(record.message_ids[0].create_uid, self.user_root)
|
||
self.assertEqual(record.message_ids[0].author_id, self.user_employee.partner_id)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink')
|
||
def test_message_process_create_uid_email_follower(self):
|
||
self.alias.write({
|
||
'alias_parent_model_id': self.env['ir.model']._get_id(self.test_record._name),
|
||
'alias_parent_thread_id': self.test_record.id,
|
||
})
|
||
follower_user = mail_new_test_user(self.env, login='better', groups='base.group_user', name='Ernest Follower', email=self.user_employee.email)
|
||
self.test_record.message_subscribe(follower_user.partner_id.ids)
|
||
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.user_employee.email_formatted, f'groups@{self.alias_domain}', subject='FollowerWinner')
|
||
self.assertEqual(record.create_uid, follower_user)
|
||
self.assertEqual(record.message_ids[0].subject, 'FollowerWinner')
|
||
self.assertEqual(record.message_ids[0].create_uid, self.user_root)
|
||
self.assertEqual(record.message_ids[0].author_id, follower_user.partner_id)
|
||
|
||
# name order win
|
||
self.test_record.message_unsubscribe(follower_user.partner_id.ids)
|
||
self.test_record.flush_recordset()
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.user_employee.email_formatted, f'groups@{self.alias_domain}', subject='FirstFoundWinner')
|
||
self.assertEqual(record.create_uid, self.user_employee)
|
||
self.assertEqual(record.message_ids[0].subject, 'FirstFoundWinner')
|
||
self.assertEqual(record.message_ids[0].create_uid, self.user_root)
|
||
self.assertEqual(record.message_ids[0].author_id, self.user_employee.partner_id)
|
||
|
||
# --------------------------------------------------
|
||
# Alias routing management
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_no_domain(self):
|
||
""" Incoming email: write to alias with no domain set: not recognized as
|
||
a valid alias even when local-part only is checked. """
|
||
self.alias.alias_domain_id = False
|
||
|
||
for incoming_ok in [True, False]:
|
||
with self.subTest(incoming_ok=incoming_ok):
|
||
with self.assertRaises(ValueError):
|
||
_new_record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted, f'groups@{self.alias_domain}',
|
||
subject='Test Subject'
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_alias_incoming_local(self):
|
||
""" Incoming email: write to alias using local part only: depends on
|
||
alias accepting local only flag. """
|
||
self.alias.alias_incoming_local = True
|
||
new_record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted, 'groups@another.domain.com',
|
||
subject='Test Subject Global'
|
||
)
|
||
self.assertEqual(len(new_record), 1, 'message_process: a new mail.test.simple should have been created')
|
||
|
||
self.alias.alias_incoming_local = False
|
||
with self.assertRaises(ValueError):
|
||
_new_record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted, 'groups@another.domain.com',
|
||
subject='Test Subject Local'
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_forward_bypass_reply_first(self):
|
||
""" Incoming email: write to two "new thread" alias, one as a reply, one being another model -> consider as a forward """
|
||
self.assertEqual(len(self.test_record.message_ids), 1)
|
||
|
||
# test@.. will cause the creation of new mail.test
|
||
new_alias_2 = self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.container').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
new_rec = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{new_alias_2.display_name}, {self.alias.display_name}',
|
||
subject='Test Subject',
|
||
extra=f'In-Reply-To:\r\n\t{self.fake_email.message_id}\n',
|
||
target_model=new_alias_2.alias_model_id.model
|
||
)
|
||
# Forward created a new record in mail.test
|
||
self.assertEqual(len(new_rec), 1, 'message_process: a new mail.test should have been created')
|
||
self.assertEqual(new_rec._name, new_alias_2.alias_model_id.model)
|
||
# No new post on test_record, no new record in mail.test.simple either
|
||
self.assertEqual(len(self.test_record.message_ids), 1, 'message_process: should not post on replied record as forward should bypass it')
|
||
new_simple = self.env['mail.test.simple'].search([('name', '=', 'Test Subject')])
|
||
self.assertEqual(len(new_simple), 0, 'message_process: a new mail.test should not have been created')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_forward_bypass_reply_second(self):
|
||
""" Incoming email: write to two "new thread" alias, one as a reply, one being another model -> consider as a forward """
|
||
self.assertEqual(len(self.test_record.message_ids), 1)
|
||
|
||
# test@.. will cause the creation of new mail.test
|
||
new_alias_2 = self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.container').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
new_rec = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{self.alias.display_name}, {new_alias_2.display_name}',
|
||
subject='Test Subject',
|
||
extra=f'In-Reply-To:\r\n\t{self.fake_email.message_id}\n',
|
||
target_model=new_alias_2.alias_model_id.model
|
||
)
|
||
# Forward created a new record in mail.test
|
||
self.assertEqual(len(new_rec), 1, 'message_process: a new mail.test should have been created')
|
||
self.assertEqual(new_rec._name, new_alias_2.alias_model_id.model)
|
||
# No new post on test_record, no new record in mail.test.simple either
|
||
self.assertEqual(len(self.test_record.message_ids), 1, 'message_process: should not post on replied record as forward should bypass it')
|
||
new_simple = self.env['mail.test.simple'].search([('name', '=', 'Test Subject')])
|
||
self.assertEqual(len(new_simple), 0, 'message_process: a new mail.test should not have been created')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_forward_bypass_update_alias(self):
|
||
""" Incoming email: write to one "update", one "new thread" alias, one as a reply, one being another model -> consider as a forward """
|
||
self.assertEqual(len(self.test_record.message_ids), 1)
|
||
self.alias.write({
|
||
'alias_force_thread_id': self.test_record.id,
|
||
})
|
||
|
||
# test@.. will cause the creation of new mail.test
|
||
new_alias_2 = self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.container').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
new_rec = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{new_alias_2.display_name}, {self.alias.display_name}',
|
||
subject='Test Subject',
|
||
extra=f'In-Reply-To:\r\n\t{self.fake_email.message_id}\n',
|
||
target_model=new_alias_2.alias_model_id.model
|
||
)
|
||
# Forward created a new record in mail.test
|
||
self.assertEqual(len(new_rec), 1, 'message_process: a new mail.test should have been created')
|
||
self.assertEqual(new_rec._name, new_alias_2.alias_model_id.model)
|
||
# No new post on test_record, no new record in mail.test.simple either
|
||
self.assertEqual(len(self.test_record.message_ids), 1, 'message_process: should not post on replied record as forward should bypass it')
|
||
# No new record on first alias model
|
||
new_simple = self.env['mail.test.gateway'].search([('name', '=', 'Test Subject')])
|
||
self.assertEqual(len(new_simple), 0, 'message_process: a new mail.test should not have been created')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_multiple_new(self):
|
||
""" Incoming email: write to two aliases creating records: both should be activated """
|
||
# test@.. will cause the creation of new mail.test
|
||
new_alias_2 = self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.container').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
new_rec = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{self.alias.display_name}, {new_alias_2.display_name}',
|
||
subject='Test Subject',
|
||
target_model=new_alias_2.alias_model_id.model
|
||
)
|
||
# New record in both mail.test (new_alias_2) and mail.test.simple (self.alias)
|
||
self.assertEqual(len(new_rec), 1, 'message_process: a new mail.test should have been created')
|
||
self.assertEqual(new_rec._name, new_alias_2.alias_model_id.model)
|
||
new_simple = self.env['mail.test.gateway'].search([('name', '=', 'Test Subject')])
|
||
self.assertEqual(len(new_simple), 1, 'message_process: a new mail.test should have been created')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_alias_with_allowed_domains(self):
|
||
""" Incoming email: check that if domains are set in the optional system
|
||
parameter `mail.catchall.domain.allowed` only incoming emails from these
|
||
domains will generate records."""
|
||
# test@.. will cause the creation of new mail.test.container
|
||
new_alias_2 = self.env['mail.alias'].create({
|
||
'alias_contact': 'everyone',
|
||
'alias_domain_id': self.mail_alias_domain_c2.id,
|
||
'alias_incoming_local': True,
|
||
'alias_model_id': self.env['ir.model']._get_id('mail.test.container.mc'),
|
||
'alias_name': 'test',
|
||
})
|
||
|
||
test_domain = 'hello.com'
|
||
for (alias_right_part, allowed_domain), container_created in zip(
|
||
[
|
||
# Test a valid alias domain, standard case
|
||
(self.mail_alias_domain_c2.name, ""),
|
||
# Test with 'mail.catchall.domain.allowed' not set in system parameters
|
||
# and with a domain not allowed
|
||
('bonjour.com', ""),
|
||
# Test with 'mail.catchall.domain.allowed' set in system parameters
|
||
# and with a domain not allowed
|
||
('bonjour.com', test_domain),
|
||
# Test with 'mail.catchall.domain.allowed' set in system parameters
|
||
# and with a domain allowed
|
||
(test_domain, test_domain),
|
||
], [True, True, False, True]):
|
||
with self.subTest(alias_right_part=alias_right_part, allowed_domain=allowed_domain):
|
||
self.env['ir.config_parameter'].set_param('mail.catchall.domain.allowed', allowed_domain)
|
||
|
||
subject = f'Test wigh {alias_right_part}-{allowed_domain}'
|
||
email_to = f'{self.alias.alias_name}@{self.alias_domain}, {new_alias_2.alias_name}@{alias_right_part}'
|
||
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted, email_to,
|
||
subject=subject,
|
||
target_model=self.alias.alias_model_id.model
|
||
)
|
||
|
||
res_alias_1 = self.env['mail.test.gateway'].search([('name', '=', subject)])
|
||
res_alias_2 = self.env['mail.test.container.mc'].search([('name', '=', subject)])
|
||
self.assertTrue(bool(res_alias_1), 'First alias should always be respected')
|
||
self.assertEqual(bool(res_alias_2), container_created)
|
||
|
||
# --------------------------------------------------
|
||
# Email Management
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_bounce(self):
|
||
"""Incoming email: bounce using bounce alias: no record creation """
|
||
with self.mock_mail_gateway():
|
||
new_recs = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{self.alias_bounce}@{self.alias_domain}',
|
||
subject='Should bounce',
|
||
)
|
||
self.assertFalse(new_recs)
|
||
self.assertNotSentEmail()
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_bounce_other_recipients(self):
|
||
"""Incoming email: bounce processing: bounce should be computed even if not first recipient """
|
||
with self.mock_mail_gateway():
|
||
new_recs = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{self.alias.alias_name}@{self.alias_domain}, {self.alias_bounce}@{self.alias_domain}',
|
||
subject='Should bounce',
|
||
)
|
||
self.assertFalse(new_recs)
|
||
self.assertNotSentEmail()
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.addons.mail.models.mail_mail', 'odoo.models.unlink')
|
||
def test_message_route_write_to_catchall(self):
|
||
""" Writing directly to catchall should bounce """
|
||
# Test: no group created, email bounced
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'"My Super Catchall" <{self.alias_catchall}@{self.alias_domain}',
|
||
subject='Should Bounce')
|
||
self.assertFalse(record)
|
||
self.assertSentEmail(
|
||
self.mailer_daemon_email,
|
||
['whatever-2a840@postmaster.twitter.com'],
|
||
subject='Re: Should Bounce'
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_write_to_catchall_other_recipients_first(self):
|
||
""" Writing directly to catchall and a valid alias should take alias """
|
||
# Test: no group created, email bounced
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{self.alias_catchall}@{self.alias_domain}, {self.alias.alias_name}@{self.alias_domain}',
|
||
subject='Catchall Not Blocking'
|
||
)
|
||
# Test: one group created
|
||
self.assertEqual(len(record), 1, 'message_process: a new mail.test should have been created')
|
||
# No bounce email
|
||
self.assertNotSentEmail()
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_route_write_to_catchall_other_recipients_second(self):
|
||
""" Writing directly to catchall and a valid alias should take alias """
|
||
# Test: no group created, email bounced
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'{self.alias.alias_name}@{self.alias_domain}, {self.alias_catchall}@{self.alias_domain}',
|
||
subject='Catchall Not Blocking'
|
||
)
|
||
# Test: one group created
|
||
self.assertEqual(len(record), 1, 'message_process: a new mail.test should have been created')
|
||
# No bounce email
|
||
self.assertNotSentEmail()
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.addons.mail.models.mail_mail', 'odoo.models.unlink')
|
||
def test_message_route_write_to_catchall_other_recipients_invalid(self):
|
||
""" Writing to catchall and other unroutable recipients should bounce. """
|
||
# Test: no group created, email bounced
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.partner_1.email_formatted,
|
||
f'"My Super Catchall" <{self.alias_catchall}@{self.alias_domain}>, Unroutable <unroutable@{self.alias_domain}>',
|
||
subject='Should Bounce')
|
||
self.assertFalse(record)
|
||
self.assertSentEmail(
|
||
self.mailer_daemon_email,
|
||
['whatever-2a840@postmaster.twitter.com'],
|
||
subject='Re: Should Bounce'
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_alias(self):
|
||
""" Writing to bounce alias is considered as a bounce even if not multipart/report bounce structure """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
bounce_email_to = f'{self.alias_bounce}@{self.alias_domain}'
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.partner_1.email_formatted, bounce_email_to, subject='Undelivered Mail Returned to Sender')
|
||
self.assertFalse(record)
|
||
# No information found in bounce email -> not possible to do anything except avoiding email
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_from_mailer_demon(self):
|
||
""" MAILER_DAEMON emails are considered as bounce """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
record = self.format_and_process(MAIL_TEMPLATE, 'MAILER-DAEMON@example.com', f'groups@{self.alias_domain}', subject='Undelivered Mail Returned to Sender')
|
||
self.assertFalse(record)
|
||
# No information found in bounce email -> not possible to do anything except avoiding email
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_missing_final_recipient(self):
|
||
"""The Final-Recipient header is missing, the partner must be found thanks to the original mail message."""
|
||
email = test_mail_data.MAIL_BOUNCE.replace('Final-Recipient', 'XX')
|
||
email = email.replace('Original-Recipient', 'XX')
|
||
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
# no notification to find, won't be able to find the correct recipient
|
||
extra = self.fake_email.message_id
|
||
record = self.format_and_process(email, self.partner_1.email_formatted, f'{self.alias_bounce}@{self.alias_domain}', subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
# the partner will be found in the <mail.notification> res_partner_id
|
||
extra = self.fake_email.message_id
|
||
self.env['mail.notification'].create({
|
||
"res_partner_id": self.partner_1.id,
|
||
"mail_message_id": self.fake_email.id,
|
||
})
|
||
record = self.format_and_process(email, self.partner_1.email_formatted, f'{self.alias_bounce}@{self.alias_domain}', subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 1)
|
||
self.assertEqual(self.test_record.message_bounce, 1)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_multipart_alias(self):
|
||
""" Multipart/report bounce correctly make related partner bounce """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
bounce_email_to = f'{self.alias_bounce}@{self.alias_domain}'
|
||
record = self.format_and_process(test_mail_data.MAIL_BOUNCE, self.partner_1.email_formatted, bounce_email_to, subject='Undelivered Mail Returned to Sender')
|
||
self.assertFalse(record)
|
||
# Missing in reply to message_id -> cannot find original record
|
||
self.assertEqual(self.partner_1.message_bounce, 1)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_multipart_alias_reply(self):
|
||
""" Multipart/report bounce correctly make related partner and record found in bounce email bounce """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
notification = self.env['mail.notification'].create({
|
||
'mail_message_id': self.fake_email.id,
|
||
'res_partner_id': self.partner_1.id,
|
||
})
|
||
|
||
bounce_email_to = f'{self.alias_bounce}@{self.alias_domain}'
|
||
extra = self.fake_email.message_id
|
||
record = self.format_and_process(test_mail_data.MAIL_BOUNCE, self.partner_1.email_formatted, bounce_email_to, subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 1)
|
||
self.assertEqual(self.test_record.message_bounce, 1)
|
||
self.assertIn(
|
||
'This is the mail system at host mail2.test.ironsky.',
|
||
notification.failure_reason,
|
||
msg='Should store the bounce email body on the notification')
|
||
self.assertEqual(notification.failure_type, 'mail_bounce')
|
||
self.assertEqual(notification.notification_status, 'bounce')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_multipart_alias_whatever_from(self):
|
||
""" Multipart/report bounce correctly make related record found in bounce email bounce """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
bounce_email_to = f'{self.alias_bounce}@{self.alias_domain}'
|
||
extra = self.fake_email.message_id
|
||
record = self.format_and_process(test_mail_data.MAIL_BOUNCE, 'Whatever <what@ever.com>', bounce_email_to, subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 1)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_multipart_whatever_to_and_from(self):
|
||
""" Multipart/report bounce correctly make related record found in bounce email bounce """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
extra = self.fake_email.message_id
|
||
record = self.format_and_process(test_mail_data.MAIL_BOUNCE, 'Whatever <what@ever.com>', f'groups@{self.alias_domain}', subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 1)
|
||
|
||
# The local part of the FROM is not "MAILER-DAEMON", and the Content type is slightly
|
||
# different. Thanks to the report type, it still should be detected as a bounce email.
|
||
email = test_mail_data.MAIL_BOUNCE.replace('multipart/report;', 'multipart/report:')
|
||
email = email.replace('MAILER-DAEMON@mail2.test.ironsky', 'email@mail2.test.ironsky')
|
||
self.assertIn('report-type=delivery-status', email)
|
||
extra = self.fake_email.message_id
|
||
record = self.format_and_process(email, 'Whatever <what@ever.com>', f'groups@{self.alias_domain}', subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(self.test_record.message_bounce, 2)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink')
|
||
def test_message_process_bounce_records_channel(self):
|
||
""" Test blacklist allow to multi-bounce and auto update of discuss.channel """
|
||
other_record = self.env['mail.test.gateway'].create({
|
||
'email_from': f'Another name <{self.partner_1.email}>'
|
||
})
|
||
yet_other_record = self.env['mail.test.gateway'].create({
|
||
'email_from': f'Yet Another name <{self.partner_1.email.upper()}>'
|
||
})
|
||
test_channel = self.env['discuss.channel'].create({
|
||
'name': 'Test',
|
||
'channel_partner_ids': [(4, self.partner_1.id)],
|
||
'group_public_id': None,
|
||
})
|
||
self.fake_email.write({
|
||
'model': 'discuss.channel',
|
||
'res_id': test_channel.id,
|
||
})
|
||
self.assertIn(self.partner_1, test_channel.channel_partner_ids)
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.assertEqual(other_record.message_bounce, 0)
|
||
self.assertEqual(yet_other_record.message_bounce, 0)
|
||
|
||
extra = self.fake_email.message_id
|
||
for i in range(10):
|
||
record = self.format_and_process(
|
||
test_mail_data.MAIL_BOUNCE, f'A third name <{self.partner_1.email}>',
|
||
f'groups@{self.alias_domain}',
|
||
subject='Undelivered Mail Returned to Sender',
|
||
extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 10)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
self.assertEqual(other_record.message_bounce, 10)
|
||
self.assertEqual(yet_other_record.message_bounce, 10)
|
||
self.assertNotIn(self.partner_1, test_channel.channel_partner_ids)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_bounce_records_partner(self):
|
||
""" Test blacklist + bounce on ``res.partner`` model """
|
||
self.assertEqual(self.partner_1.message_bounce, 0)
|
||
self.fake_email.write({
|
||
'model': 'res.partner',
|
||
'res_id': self.partner_1.id,
|
||
})
|
||
|
||
extra = self.fake_email.message_id
|
||
record = self.format_and_process(test_mail_data.MAIL_BOUNCE, self.partner_1.email_formatted, f'groups@{self.alias_domain}', subject='Undelivered Mail Returned to Sender', extra=extra)
|
||
self.assertFalse(record)
|
||
self.assertEqual(self.partner_1.message_bounce, 1)
|
||
self.assertEqual(self.test_record.message_bounce, 0)
|
||
|
||
# --------------------------------------------------
|
||
# Thread formation
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail', 'odoo.tests')
|
||
def test_message_process_external_notification_reply(self):
|
||
"""Ensure responses bot messages are discussions."""
|
||
bot_notification_message = self._create_gateway_message(
|
||
self.test_record,
|
||
'bot_notif_message',
|
||
author_id=self.env.ref('base.partner_root').id,
|
||
message_type='auto_comment',
|
||
is_internal=True,
|
||
subtype_id=self.env.ref('mail.mt_note').id,
|
||
)
|
||
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, '',
|
||
subject='Reply to bot notif',
|
||
extra=f'References: {bot_notification_message.message_id}'
|
||
)
|
||
new_msg = self.test_record.message_ids[0]
|
||
self.assertFalse(new_msg.is_internal, "Responses to messages sent by odoobot should always be public.")
|
||
self.assertEqual(new_msg.parent_id, bot_notification_message)
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'))
|
||
|
||
# Also check the regular case
|
||
some_notification_message = self._create_gateway_message(
|
||
self.test_record,
|
||
'some_notif_message',
|
||
message_type='notification',
|
||
is_internal=True,
|
||
subtype_id=self.env.ref('mail.mt_note').id,
|
||
)
|
||
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, '',
|
||
subject='Reply to some notif',
|
||
extra=f'References: {some_notification_message.message_id}'
|
||
)
|
||
new_msg = self.test_record.message_ids[0]
|
||
self.assertTrue(new_msg.is_internal, "Responses to messages sent by anyone but odoobot should keep"
|
||
"the 'is_internal' value of the parent.")
|
||
self.assertEqual(new_msg.parent_id, some_notification_message)
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_note'))
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_in_reply_to(self):
|
||
""" Incoming email using in-rely-to should go into the right destination even with a wrong destination """
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, 'valid.other@gmail.com', f'erroneous@{self.alias_domain}',
|
||
subject='Re: news', extra=f'In-Reply-To:\r\n\t{self.fake_email.message_id}\n')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count + 1)
|
||
self.assertEqual(self.fake_email.child_ids, self.test_record.message_ids[0])
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_references(self):
|
||
""" Incoming email using references should go into the right destination even with a wrong destination """
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'erroneous@{self.alias_domain}',
|
||
extra=f'References: <2233@a.com>\r\n\t<3edss_dsa@b.com> {self.fake_email.message_id}')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count + 1)
|
||
self.assertEqual(self.fake_email.child_ids, self.test_record.message_ids[0])
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail', 'odoo.tests')
|
||
def test_message_process_references_multi_parent(self):
|
||
""" Incoming email with multiple references """
|
||
reply1 = self._create_gateway_message(
|
||
self.test_record, 'reply1', parent_id=self.fake_email.id,
|
||
)
|
||
reply2 = self._create_gateway_message(
|
||
self.test_record, 'reply2', parent_id=self.fake_email.id,
|
||
subtype_id=self.env['ir.model.data']._xmlid_to_res_id('mail.mt_note'),
|
||
)
|
||
reply1_1 = self._create_gateway_message(
|
||
self.test_record, 'reply1_1', parent_id=reply1.id,
|
||
subtype_id=self.env['ir.model.data']._xmlid_to_res_id('mail.mt_note'),
|
||
)
|
||
reply2_1 = self._create_gateway_message(
|
||
self.test_record, 'reply2_1', parent_id=reply2.id,
|
||
)
|
||
|
||
# reply to reply1 using multiple references
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Reply to reply1',
|
||
extra=f'References: {reply1.message_id} {self.fake_email.message_id}'
|
||
)
|
||
new_msg = self.test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, self.fake_email, 'Mail: flattening attach to original message')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: reply to a comment should be a comment')
|
||
|
||
# ordering should not impact
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Reply to reply1 (order issue)',
|
||
extra=f'References: {self.fake_email.message_id} {reply1.message_id}'
|
||
)
|
||
new_msg = self.test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, self.fake_email, 'Mail: flattening attach to original message')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: reply to a comment should be a comment')
|
||
|
||
# history with last one being a note
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Reply to reply1_1',
|
||
extra=f'References: {reply1_1.message_id} {self.fake_email.message_id}'
|
||
)
|
||
new_msg = self.test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, self.fake_email, 'Mail: flattening attach to original message')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_note'), 'Mail: reply to a note should be a note')
|
||
|
||
# messed up history (two child branches): gateway initial parent is newest one
|
||
# (then may change with flattening when posting on record)
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}',
|
||
subject='Reply to reply2_1 (with noise)',
|
||
extra=f'References: {reply1_1.message_id} {reply2_1.message_id}'
|
||
)
|
||
new_msg = self.test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, self.fake_email, 'Mail: flattening attach to original message')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: parent should be a comment (before flattening)')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail', 'odoo.tests')
|
||
def test_message_process_references_multi_parent_notflat(self):
|
||
""" Incoming email with multiple references with ``_mail_flat_thread``
|
||
being False (mail.group/discuss.channel behavior like). """
|
||
test_record = self.env['mail.test.gateway.groups'].create({
|
||
'alias_name': 'test.gateway',
|
||
'name': 'Test',
|
||
'email_from': 'ignasse@example.com',
|
||
})
|
||
|
||
# Set a first message on public group to test update and hierarchy
|
||
first_msg = self._create_gateway_message(test_record, 'first_msg')
|
||
reply1 = self._create_gateway_message(
|
||
test_record, 'reply1', parent_id=first_msg.id,
|
||
)
|
||
reply2 = self._create_gateway_message(
|
||
test_record, 'reply2', parent_id=first_msg.id,
|
||
subtype_id=self.env['ir.model.data']._xmlid_to_res_id('mail.mt_note'),
|
||
)
|
||
reply1_1 = self._create_gateway_message(
|
||
test_record, 'reply1_1', parent_id=reply1.id,
|
||
subtype_id=self.env['ir.model.data']._xmlid_to_res_id('mail.mt_note'),
|
||
)
|
||
reply2_1 = self._create_gateway_message(
|
||
test_record, 'reply2_1', parent_id=reply2.id,
|
||
)
|
||
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.gateway@{self.alias_domain}',
|
||
subject='Reply to reply1',
|
||
extra=f'References: {reply1.message_id}'
|
||
)
|
||
new_msg = test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, first_msg, 'Mail: pseudo no flattening: getting up one level (reply1 parent)')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: parent should be a comment')
|
||
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.gateway@{self.alias_domain}',
|
||
subject='Reply to reply1_1 (with noise)',
|
||
extra=f'References: {reply1_1.message_id} {reply1.message_id} {reply1.message_id}'
|
||
)
|
||
new_msg = test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, reply1, 'Mail: pseudo no flattening: getting up one level (reply1_1 parent)')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_note'), 'Mail: reply to a note should be a note')
|
||
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.gateway@{self.alias_domain}',
|
||
subject='Reply to reply2_1 (with noise)',
|
||
extra=f'References: {reply2_1.message_id} {reply1_1.message_id}'
|
||
)
|
||
new_msg = test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, reply2, 'Mail: pseudo no flattening: getting up one level (reply2_1 parent')
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: parent should be a comment')
|
||
|
||
# no references: new discussion thread started
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.gateway@{self.alias_domain}',
|
||
subject='New thread',
|
||
extra='References:'
|
||
)
|
||
new_thread = test_record.message_ids[0]
|
||
self.assertFalse(new_thread.parent_id, 'Mail: pseudo no flattening: no parent means new thread')
|
||
self.assertEqual(new_thread.subject, 'New thread')
|
||
self.assertEqual(new_thread.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: parent should be a comment')
|
||
|
||
# mixed up references: newer message wins
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.gateway@{self.alias_domain}',
|
||
subject='New thread',
|
||
extra=f'References: {new_thread.message_id} {reply1_1.message_id}'
|
||
)
|
||
new_msg = test_record.message_ids[0]
|
||
self.assertEqual(new_msg.parent_id, new_thread)
|
||
self.assertEqual(new_msg.subtype_id, self.env.ref('mail.mt_comment'), 'Mail: parent should be a comment')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_references_external(self):
|
||
""" Incoming email being a reply to an external email processed by odoo should update thread accordingly """
|
||
new_message_id = '<ThisIsTooMuchFake.MonsterEmail.789@agrolait.com>'
|
||
self.fake_email.write({
|
||
'message_id': new_message_id
|
||
})
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'erroneous@{self.alias_domain}',
|
||
extra=f'References: <2233@a.com>\r\n\t<3edss_dsa@b.com> {self.fake_email.message_id}')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count + 1)
|
||
self.assertEqual(self.fake_email.child_ids, self.test_record.message_ids[0])
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_references_external_buggy_message_id(self):
|
||
"""
|
||
Incoming email being a reply to an external email processed by
|
||
odoo should update thread accordingly. Special case when the
|
||
external mail service wrongly folds the message_id on several
|
||
lines.
|
||
"""
|
||
new_message_id = '<ThisIsTooMuchFake.MonsterEmail.789@agrolait.com>'
|
||
buggy_message_id = new_message_id.replace('MonsterEmail', 'Monster\r\n Email')
|
||
self.fake_email.write({
|
||
'message_id': new_message_id
|
||
})
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'erroneous@{self.alias_domain}',
|
||
extra=f'References: <2233@a.com>\r\n\t<3edss_dsa@b.com> {buggy_message_id}')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count + 1)
|
||
self.assertEqual(self.fake_email.child_ids, self.test_record.message_ids[0])
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_references_forward(self):
|
||
""" Incoming email using references but with alias forward should not go into references destination """
|
||
self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test.alias',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.container').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
res_test = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.alias@{self.alias_domain}',
|
||
subject='My Dear Forward',
|
||
extra=f'References: <2233@a.com>\r\n\t<3edss_dsa@b.com> {self.fake_email.message_id}',
|
||
target_model='mail.test.container')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count)
|
||
self.assertEqual(len(self.fake_email.child_ids), 0)
|
||
self.assertEqual(res_test.name, 'My Dear Forward')
|
||
self.assertEqual(len(res_test.message_ids), 1)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_references_forward_same_model(self):
|
||
""" Incoming email using references but with alias forward on same model should be considered as a reply """
|
||
self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test.alias',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.gateway').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
res_test = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test.alias@{self.alias_domain}',
|
||
subject='My Dear Forward',
|
||
extra=f'References: <2233@a.com>\r\n\t<3edss_dsa@b.com> {self.fake_email.message_id}',
|
||
target_model='mail.test.container')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count + 1)
|
||
self.assertEqual(len(self.fake_email.child_ids), 1)
|
||
self.assertFalse(res_test)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_references_forward_cc(self):
|
||
""" Incoming email using references but with alias forward in CC should be considered as a repy (To > Cc) """
|
||
self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test.alias',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.container').id,
|
||
'alias_contact': 'everyone',
|
||
})
|
||
init_msg_count = len(self.test_record.message_ids)
|
||
res_test = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from,
|
||
f'{self.alias_catchall}@{self.alias_domain}',
|
||
cc=f'test.alias@{self.alias_domain}',
|
||
subject='My Dear Forward',
|
||
extra=f'References: <2233@a.com>\r\n\t<3edss_dsa@b.com> {self.fake_email.message_id}',
|
||
target_model='mail.test.container')
|
||
|
||
self.assertEqual(len(self.test_record.message_ids), init_msg_count + 1)
|
||
self.assertEqual(len(self.fake_email.child_ids), 1)
|
||
self.assertFalse(res_test)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models.unlink', 'odoo.addons.mail.models.mail_mail')
|
||
def test_message_process_reply_to_new_thread(self):
|
||
""" Test replies not being considered as replies but use destination information instead (aka, mass post + specific reply to using aliases) """
|
||
# shorten company name to prevent 68 character formatting from
|
||
# triggering and making the assert missmatch.
|
||
# See _notify_get_reply_to_formatted_email method
|
||
self.user_employee.company_id.name = "Forced"
|
||
first_record = self.env['mail.test.simple'].with_user(self.user_employee).create({'name': 'Replies to Record'})
|
||
record_msg = first_record.message_post(
|
||
subject='Discussion',
|
||
reply_to_force_new=False,
|
||
subtype_xmlid='mail.mt_comment',
|
||
)
|
||
self.assertEqual(
|
||
record_msg.reply_to,
|
||
formataddr((f'{self.user_employee.company_id.name} {first_record.name}',
|
||
f'{self.alias_catchall}@{self.alias_domain}'))
|
||
)
|
||
mail_msg = first_record.message_post(
|
||
subject='Replies to Record',
|
||
reply_to=f'groups@{self.alias_domain}',
|
||
reply_to_force_new=True,
|
||
subtype_xmlid='mail.mt_comment',
|
||
)
|
||
self.assertEqual(mail_msg.reply_to, f'groups@{self.alias_domain}')
|
||
|
||
# reply to mail but should be considered as a new mail for alias
|
||
msgID = '<this.is.duplicate.test@iron.sky>'
|
||
res_test = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, record_msg.reply_to, cc='',
|
||
subject='Re: Replies to Record', extra=f'In-Reply-To: {record_msg.message_id}',
|
||
msg_id=msgID, target_model='mail.test.simple')
|
||
incoming_msg = self.env['mail.message'].search([('message_id', '=', msgID)])
|
||
self.assertFalse(res_test)
|
||
self.assertEqual(incoming_msg.model, 'mail.test.simple')
|
||
self.assertEqual(incoming_msg.parent_id, first_record.message_ids[-1])
|
||
self.assertTrue(incoming_msg.res_id == first_record.id)
|
||
|
||
# reply to mail but should be considered as a new mail for alias
|
||
msgID = '<this.is.for.testing@iron.sky>'
|
||
res_test = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, mail_msg.reply_to, cc='',
|
||
subject='Re: Replies to Record', extra=f'In-Reply-To: {mail_msg.message_id}',
|
||
msg_id=msgID, target_model='mail.test.gateway')
|
||
incoming_msg = self.env['mail.message'].search([('message_id', '=', msgID)])
|
||
self.assertEqual(len(res_test), 1)
|
||
self.assertEqual(res_test.name, 'Re: Replies to Record')
|
||
self.assertEqual(incoming_msg.model, 'mail.test.gateway')
|
||
self.assertFalse(incoming_msg.parent_id)
|
||
self.assertTrue(incoming_msg.res_id == res_test.id)
|
||
|
||
# --------------------------------------------------
|
||
# Gateway / Record synchronization
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_gateway_values_base64_image(self):
|
||
"""New record with mail that contains base64 inline image."""
|
||
target_model = "mail.test.field.type"
|
||
alias = self.env["mail.alias"].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
"alias_name": "base64-lover",
|
||
"alias_model_id": self.env["ir.model"]._get(target_model).id,
|
||
"alias_defaults": "{}",
|
||
"alias_contact": "everyone",
|
||
})
|
||
record = self.format_and_process(
|
||
test_mail_data.MAIL_TEMPLATE_EXTRA_HTML, self.email_from,
|
||
f'{alias.alias_name}@{self.alias_domain}',
|
||
subject='base64 image to alias',
|
||
target_model=target_model,
|
||
extra_html='<img src="data:image/png;base64,iV/+OkI=">',
|
||
)
|
||
self.assertEqual(record.type, "first")
|
||
self.assertEqual(len(record.message_ids[0].attachment_ids), 1)
|
||
self.assertEqual(record.message_ids[0].attachment_ids[0].name, "image0")
|
||
self.assertEqual(record.message_ids[0].attachment_ids[0].type, "binary")
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_gateway_values_base64_image_walias(self):
|
||
"""New record with mail that contains base64 inline image + default values
|
||
coming from alias."""
|
||
target_model = "mail.test.field.type"
|
||
alias = self.env["mail.alias"].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
"alias_name": "base64-lover",
|
||
"alias_model_id": self.env["ir.model"]._get(target_model).id,
|
||
"alias_defaults": "{'type': 'second'}",
|
||
"alias_contact": "everyone",
|
||
})
|
||
record = self.format_and_process(
|
||
test_mail_data.MAIL_TEMPLATE_EXTRA_HTML, self.email_from,
|
||
f'{alias.alias_name}@{self.alias_domain}',
|
||
subject='base64 image to alias',
|
||
target_model=target_model,
|
||
extra_html='<img src="data:image/png;base64,iV/+OkI=">',
|
||
)
|
||
self.assertEqual(record.type, "second")
|
||
self.assertEqual(len(record.message_ids[0].attachment_ids), 1)
|
||
self.assertEqual(record.message_ids[0].attachment_ids[0].name, "image0")
|
||
self.assertEqual(record.message_ids[0].attachment_ids[0].type, "binary")
|
||
|
||
# --------------------------------------------------
|
||
# Thread formation: mail gateway corner cases
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_extra_model_res_id(self):
|
||
""" Incoming email with ref holding model / res_id but that does not match any message in the thread: must raise since OpenERP saas-3 """
|
||
self.assertRaises(ValueError,
|
||
self.format_and_process, MAIL_TEMPLATE,
|
||
self.partner_1.email_formatted, f'noone@{self.alias_domain}', subject='spam',
|
||
extra=f'In-Reply-To: <12321321-openerp-{self.test_record.id}-{self.test_record._name}@{socket.gethostname()}>')
|
||
|
||
# when 6.1 messages are present, compat mode is available
|
||
# Odoo 10 update: compat mode has been removed and should not work anymore
|
||
self.fake_email.write({'message_id': False})
|
||
# Do: compat mode accepts partial-matching emails
|
||
self.assertRaises(
|
||
ValueError,
|
||
self.format_and_process, MAIL_TEMPLATE,
|
||
self.partner_1.email_formatted, f'noone@{self.alias_domain}>', subject='spam',
|
||
extra=f'In-Reply-To: <12321321-openerp-{self.test_record.id}-mail.test.gateway@{socket.gethostname()}>')
|
||
|
||
# Test created messages
|
||
self.assertEqual(len(self.test_record.message_ids), 1)
|
||
self.assertEqual(len(self.test_record.message_ids[0].child_ids), 0)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_duplicate(self):
|
||
""" Duplicate emails (same message_id) are not processed """
|
||
self.alias.write({'alias_force_thread_id': self.test_record.id,})
|
||
|
||
# Post a base message
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Re: super cats', msg_id='<123.456.diff1@agrolait.com>')
|
||
self.assertFalse(record)
|
||
self.assertEqual(len(self.test_record.message_ids), 2)
|
||
|
||
# Do: due to some issue, same email goes back into the mailgateway
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'groups@{self.alias_domain}', subject='Re: news',
|
||
msg_id='<123.456.diff1@agrolait.com>', extra='In-Reply-To: <1198923581.41972151344608186799.JavaMail.diff1@agrolait.com>\n')
|
||
self.assertFalse(record)
|
||
self.assertEqual(len(self.test_record.message_ids), 2)
|
||
|
||
# Test: message_id is still unique
|
||
no_of_msg = self.env['mail.message'].search_count([('message_id', 'ilike', '<123.456.diff1@agrolait.com>')])
|
||
self.assertEqual(no_of_msg, 1,
|
||
'message_process: message with already existing message_id should not have been duplicated')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_crash_wrong_model(self):
|
||
""" Incoming email with model that does not accepts incoming emails must raise """
|
||
self.assertRaises(ValueError,
|
||
self.format_and_process,
|
||
MAIL_TEMPLATE, self.email_from, f'noone@{self.alias_domain}',
|
||
subject='spam', extra='', model='res.country')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_crash_no_data(self):
|
||
""" Incoming email without model and without alias must raise """
|
||
self.assertRaises(ValueError,
|
||
self.format_and_process,
|
||
MAIL_TEMPLATE, self.email_from, f'noone@{self.alias_domain}',
|
||
subject='spam', extra='')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread', 'odoo.models')
|
||
def test_message_process_fallback(self):
|
||
""" Incoming email with model that accepting incoming emails as fallback """
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'noone@{self.alias_domain}',
|
||
subject='Spammy', extra='', model='mail.test.gateway')
|
||
self.assertEqual(len(record), 1)
|
||
self.assertEqual(record.name, 'Spammy')
|
||
self.assertEqual(record._name, 'mail.test.gateway')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_file_encoding(self):
|
||
""" Incoming email with file encoding """
|
||
file_content = 'Hello World'
|
||
for encoding in ['', 'UTF-8', 'UTF-16LE', 'UTF-32BE']:
|
||
file_content_b64 = base64.b64encode(file_content.encode(encoding or 'utf-8')).decode()
|
||
record = self.format_and_process(test_mail_data.MAIL_FILE_ENCODING,
|
||
self.email_from, f'groups@{self.alias_domain}',
|
||
subject=f'Test Charset {encoding or "Unset"}',
|
||
charset=f'; charset="{encoding}"' if encoding else '',
|
||
content=file_content_b64
|
||
)
|
||
attachment = record.message_ids.attachment_ids
|
||
self.assertEqual(file_content, attachment.raw.decode(encoding or 'utf-8'))
|
||
if encoding not in ['', 'UTF-8']:
|
||
self.assertNotEqual(file_content, attachment.raw.decode('utf-8'))
|
||
|
||
def test_message_hebrew_iso8859_8_i(self):
|
||
# This subject was found inside an email of one of our customer.
|
||
# The charset is iso-8859-8-i which isn't natively supported by
|
||
# python, check that Odoo is still capable of decoding it.
|
||
subject = "בוקר טוב! צריך איימק ושתי מסכים"
|
||
encoded_subject = "=?iso-8859-8-i?B?4eX3+CDo5eEhIPb46eog4Onp7vcg5fn66SDu8evp7Q==?="
|
||
|
||
# This content was made up using google translate. The charset
|
||
# is iso-8859-8 which is natively supported by python.
|
||
charset = "iso-8859-8"
|
||
content = "שלום וברוכים הבאים למקרה המבחן הנפלא הזה"
|
||
encoded_content = base64.b64encode(content.encode(charset)).decode()
|
||
|
||
with RecordCapturer(self.env['mail.test.gateway'], []) as capture:
|
||
mail = test_mail_data.MAIL_FILE_ENCODING.format(
|
||
msg_id="<test_message_hebrew_iso8859_8_i@iron.sky>",
|
||
subject=encoded_subject,
|
||
charset=f'; charset="{charset}"',
|
||
content=encoded_content,
|
||
)
|
||
self.env['mail.thread'].message_process('mail.test.gateway', mail)
|
||
|
||
capture.records.ensure_one()
|
||
self.assertEqual(capture.records.name, subject)
|
||
self.assertEqual(
|
||
capture.records.message_ids.attachment_ids.raw.decode(charset),
|
||
content
|
||
)
|
||
|
||
def test_message_windows_874(self):
|
||
# Email for Thai customers who use Microsoft email service.
|
||
# The charset is windows-874 which isn't natively supported by
|
||
# python, check that Odoo is still capable of decoding it.
|
||
# windows-874 is the Microsoft equivalent of cp874.
|
||
with self.mock_mail_gateway(), \
|
||
RecordCapturer(self.env['mail.test.gateway'], []) as capture:
|
||
self.env['mail.thread'].message_process('mail.test.gateway', THAI_EMAIL_WINDOWS_874)
|
||
capture.records.ensure_one()
|
||
self.assertEqual(capture.records.name, 'เรื่อง')
|
||
self.assertEqual(str(capture.records.message_ids.body), '<pre>ร่างกาย</pre>\n')
|
||
|
||
# --------------------------------------------------
|
||
# Corner cases / Bugs during message process
|
||
# --------------------------------------------------
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_file_encoding_ascii(self):
|
||
""" Incoming email containing an xml attachment with unknown characters (<28>) but an ASCII charset should not
|
||
raise an Exception. UTF-8 is used as a safe fallback.
|
||
"""
|
||
record = self.format_and_process(test_mail_data.MAIL_MULTIPART_INVALID_ENCODING, self.email_from, f'groups@{self.alias_domain}')
|
||
|
||
self.assertEqual(record.message_ids.attachment_ids.name, 'bis3_with_error_encoding_address.xml')
|
||
# NB: the xml received by email contains b"Chauss\xef\xbf\xbd\xef\xbf\xbde" with "\xef\xbf\xbd" being the
|
||
# replacement character <20> in UTF-8.
|
||
# When calling `_message_parse_extract_payload`, `part.get_content()` will be called on the attachment part of
|
||
# the email, triggering the decoding of the base64 attachment, so b"Chauss\xef\xbf\xbd\xef\xbf\xbde" is
|
||
# first retrieved. Then, `get_text_content` in `email` tries to decode this using the charset of the email
|
||
# part, i.e: `content.decode('us-ascii', errors='replace')`. So the errors are replaced using the Unicode
|
||
# replacement marker and the string "Chauss<73><73><EFBFBD><EFBFBD><EFBFBD><EFBFBD>e" is used to create the attachment.
|
||
# This explains the multiple "<22>" in the attachment.
|
||
self.assertIn("Chauss<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>e de Bruxelles", record.message_ids.attachment_ids.raw.decode())
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_file_omitted_charset_xml(self):
|
||
""" For incoming email containing an xml attachment with omitted charset and containing an UTF8 payload we
|
||
should parse the attachment using UTF-8.
|
||
"""
|
||
record = self.format_and_process(test_mail_data.MAIL_MULTIPART_OMITTED_CHARSET_XML, self.email_from, f'groups@{self.alias_domain}')
|
||
self.assertEqual(record.message_ids.attachment_ids.name, 'bis3.xml')
|
||
self.assertEqual("<Invoice>Chaussée de Bruxelles</Invoice>", record.message_ids.attachment_ids.raw.decode())
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_file_omitted_charset_csv(self):
|
||
""" For incoming email containing a csv attachment with omitted charset and containing an UTF8 payload we
|
||
should parse the attachment using UTF-8.
|
||
"""
|
||
record = self.format_and_process(test_mail_data.MAIL_MULTIPART_OMITTED_CHARSET_CSV, self.email_from, f'groups@{self.alias_domain}')
|
||
self.assertEqual(record.message_ids.attachment_ids.name, 'bis3.csv')
|
||
self.assertEqual("\ufeffAuftraggeber;LieferadresseStraße;", record.message_ids.attachment_ids.raw.decode())
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_process_file_omitted_charset_txt(self):
|
||
""" For incoming email containing a txt attachment with omitted charset and containing an UTF8 payload we
|
||
should parse the attachment using UTF-8.
|
||
"""
|
||
test_string = ("Äpfel und Birnen sind Früchte, die im Herbst geerntet werden. In der Nähe des Flusses steht ein großes, "
|
||
"altes Schloss. Über den Dächern sieht man oft Vögel fliegen. Müller und Schröder sind typische deutsche Nachnamen. "
|
||
"Die Straße, in der ich wohne, heißt „Bachstraße“ und ist sehr ruhig. Überall im Wald wachsen Bäume mit kräftigen Ästen. "
|
||
"Können wir uns über die Pläne für das nächste Wochenende unterhalten?")
|
||
record = self.format_and_process(test_mail_data.MAIL_MULTIPART_OMITTED_CHARSET_TXT, self.email_from, f'groups@{self.alias_domain}')
|
||
self.assertEqual(record.message_ids.attachment_ids.name, 'bis3.txt')
|
||
self.assertEqual(test_string, record.message_ids.attachment_ids.raw.decode())
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_route_reply_model_none(self):
|
||
"""
|
||
Test the message routing and reply functionality when the model is None.
|
||
|
||
This test case verifies the behavior of the message routing and reply process
|
||
when the 'model' field of a mail.message is set to None. It checks that the
|
||
message is correctly processed and associated with the appropriate record.
|
||
The code invokes function `format_and_process` to automatically test rounting
|
||
and then makes checks on created record.
|
||
|
||
"""
|
||
message = self.env['mail.message'].create({
|
||
'body': '<p>test</p>',
|
||
'email_from': self.email_from,
|
||
'message_type': 'email_outgoing',
|
||
'model': None,
|
||
'res_id': None,
|
||
})
|
||
|
||
self.env['mail.alias'].create({
|
||
'alias_domain_id': self.mail_alias_domain.id,
|
||
'alias_name': 'test',
|
||
'alias_model_id': self.env['ir.model']._get('mail.test.gateway').id,
|
||
})
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE, self.email_from, f'test@{self.alias_domain}',
|
||
subject=message.message_id, extra=f'In-Reply-To:\r\n\t{message.message_id}\n',
|
||
model=None)
|
||
|
||
self.assertTrue(record)
|
||
self.assertEqual(record._name, 'mail.test.gateway')
|
||
self.assertEqual(record.message_ids.subject, message.message_id)
|
||
self.assertFalse(record.message_ids.parent_id)
|
||
|
||
|
||
@tagged('mail_gateway', 'mail_loop')
|
||
class TestMailGatewayLoops(MailGatewayCommon):
|
||
|
||
@classmethod
|
||
def setUpClass(cls):
|
||
super().setUpClass()
|
||
cls.env['ir.config_parameter'].sudo().set_param('mail.gateway.loop.minutes', 30)
|
||
cls.env['ir.config_parameter'].sudo().set_param('mail.gateway.loop.threshold', 5)
|
||
|
||
cls.env['mail.gateway.allowed'].create([
|
||
{'email': 'Bob@EXAMPLE.com'},
|
||
{'email': '"Alice From Example" <alice@EXAMPLE.com>'},
|
||
{'email': '"Eve From Example" <eve@EXAMPLE.com>'},
|
||
])
|
||
|
||
cls.alias_ticket = cls.env['mail.alias'].create({
|
||
'alias_contact': 'everyone',
|
||
'alias_domain_id': cls.mail_alias_domain.id,
|
||
'alias_model_id': cls.env['ir.model']._get_id('mail.test.ticket'),
|
||
'alias_name': 'test.ticket',
|
||
})
|
||
cls.alias_other = cls.env['mail.alias'].create({
|
||
'alias_contact': 'everyone',
|
||
'alias_domain_id': cls.mail_alias_domain.id,
|
||
'alias_model_id': cls.env['ir.model']._get_id('mail.test.gateway'),
|
||
'alias_name': 'test.gateway',
|
||
})
|
||
|
||
# recipients
|
||
cls.customer_email = "customer@test.example.com"
|
||
cls.alias_partner, cls.other_partner = cls.env['res.partner'].create([
|
||
{
|
||
'email': f'"Stupid Idea" <{cls.alias_other.alias_name}@{cls.alias_other.alias_domain}>',
|
||
'name': 'Stupid Idea',
|
||
}, {
|
||
'email': '"Other Customer" <other.customer@test.example.com>',
|
||
'name': 'Other Customer',
|
||
}
|
||
])
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread')
|
||
@patch.object(Cursor, 'now', lambda *args, **kwargs: datetime(2022, 1, 1, 10, 0, 0))
|
||
def test_routing_loop_alias_create(self):
|
||
"""Test the limit on the number of record we can create by alias."""
|
||
# Send an email 2 hours ago, should not have an impact on more recent emails
|
||
with patch.object(Cursor, 'now', lambda *args, **kwargs: datetime(2022, 1, 1, 8, 0, 0)):
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
self.email_from,
|
||
f'{self.alias_ticket.alias_name}@{self.alias_domain}',
|
||
subject='Test alias loop old',
|
||
target_model=self.alias_ticket.alias_model_id.model,
|
||
)
|
||
|
||
for i in range(5):
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
self.email_from,
|
||
f'{self.alias_ticket.alias_name}@{self.alias_domain}',
|
||
subject=f'Test alias loop {i}',
|
||
target_model=self.alias_ticket.alias_model_id.model,
|
||
)
|
||
|
||
records = self.env['mail.test.ticket'].search([('name', 'ilike', 'Test alias loop %')])
|
||
self.assertEqual(len(records), 6, 'Should have created 6 <mail.test.gateway>')
|
||
self.assertEqual(set(records.mapped('email_from')), {self.email_from},
|
||
msg='Should have automatically filled the email field')
|
||
|
||
for email_from, exp_to in [
|
||
(self.email_from, formataddr(("Sylvie Lelitre", "test.sylvie.lelitre@agrolait.com"))),
|
||
(self.email_from.upper(), formataddr(("SYLVIE LELITRE", "test.sylvie.lelitre@agrolait.com"))),
|
||
]:
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
email_from,
|
||
f'{self.alias_ticket.alias_name}@{self.alias_domain}',
|
||
subject='Test alias loop X',
|
||
target_model=self.alias_ticket.alias_model_id.model,
|
||
return_path=email_from,
|
||
)
|
||
|
||
new_record = self.env['mail.test.ticket'].search([('name', '=', 'Test alias loop X')])
|
||
self.assertFalse(
|
||
new_record,
|
||
msg='The loop should have been detected and the record should not have been created')
|
||
|
||
self.assertSentEmail(f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>', [exp_to])
|
||
bounce_references = self._mails[0]['references']
|
||
self.assertIn('-loop-detection-bounce-email@', bounce_references,
|
||
msg='The "bounce email" tag must be in the reference')
|
||
|
||
# The reply to the bounce email must be ignored
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
'alice@example.com', # whitelisted from, should be taken into account
|
||
f'{self.alias_ticket.alias_name}@{self.alias_domain}',
|
||
subject='Test alias loop X',
|
||
target_model=self.alias_ticket.alias_model_id.model,
|
||
return_path=self.email_from,
|
||
extra=f'References: {bounce_references}',
|
||
)
|
||
self.assertNotSentEmail()
|
||
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
'alice@example.com', # whitelisted from, should be taken into account
|
||
f'{self.alias_ticket.alias_name}@{self.alias_domain}',
|
||
subject='Test alias loop X',
|
||
target_model=self.alias_ticket.alias_model_id.model,
|
||
return_path=self.email_from,
|
||
extra=f'In-Reply-To: {bounce_references}',
|
||
)
|
||
self.assertNotSentEmail()
|
||
|
||
# Email address in the whitelist should not have the restriction
|
||
for i in range(10):
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
'alice@example.com',
|
||
f'{self.alias_ticket.alias_name}@{self.alias_domain}',
|
||
subject=f'Whitelist test alias loop {i}',
|
||
target_model=self.alias_ticket.alias_model_id.model,
|
||
)
|
||
records = self.env['mail.test.ticket'].search([('name', 'ilike', 'Whitelist test alias loop %')])
|
||
self.assertEqual(len(records), 10, msg='Email whitelisted should not have the restriction')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread')
|
||
def test_routing_loop_alias_mix(self):
|
||
""" Test loop detection in case of multiples routes, just be sure all
|
||
routes are checked and models checked once. """
|
||
# create 2 update-records aliases and 1 new-record alias on same model
|
||
test_updates = self.env['mail.test.gateway.groups'].create([
|
||
{
|
||
'alias_name': 'test.update1',
|
||
'name': 'Update1',
|
||
}, {
|
||
'alias_name': 'test.update2',
|
||
'name': 'Update2',
|
||
},
|
||
])
|
||
alias_gateway_group, alias_ticket_other = self.env['mail.alias'].create([
|
||
{
|
||
'alias_contact': 'everyone',
|
||
'alias_model_id': self.env['ir.model']._get_id('mail.test.gateway.groups'),
|
||
'alias_name': 'test.new',
|
||
}, {
|
||
'alias_contact': 'everyone',
|
||
'alias_model_id': self.env['ir.model']._get_id('mail.test.ticket'),
|
||
'alias_name': 'test.ticket.other',
|
||
}
|
||
])
|
||
|
||
_original_ticket_sc = MailTestTicket.search_count
|
||
_original_groups_sc = MailTestGatewayGroups.search_count
|
||
_original_rgr = Message._read_group
|
||
with self.mock_mail_gateway(), \
|
||
patch.object(MailTestTicket, 'search_count', autospec=True, side_effect=_original_ticket_sc) as mock_ticket_sc, \
|
||
patch.object(MailTestGatewayGroups, 'search_count', autospec=True, side_effect=_original_groups_sc) as mock_groups_sc, \
|
||
patch.object(Message, '_read_group', autospec=True, side_effect=_original_rgr) as mock_msg_rgr:
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
self.other_partner.email_formatted,
|
||
f'"Super Help" <{self.alias_ticket.alias_name}@{self.alias_ticket.alias_domain}>,'
|
||
f'{test_updates[0].alias_id.display_name}, {test_updates[1].alias_id.display_name}, '
|
||
f'{alias_gateway_group.display_name}, {alias_ticket_other.display_name}',
|
||
subject='Valid Inquiry',
|
||
return_path=self.other_partner.email_formatted,
|
||
target_model='mail.test.ticket',
|
||
)
|
||
self.assertEqual(mock_ticket_sc.call_count, 1, 'Two alias creating tickets but one check anyway')
|
||
self.assertEqual(mock_groups_sc.call_count, 1, 'One alias creating groups')
|
||
self.assertEqual(mock_msg_rgr.call_count, 1, 'Only one model updating records, one call even if two aliases')
|
||
self.assertEqual(
|
||
len(self.env['mail.test.ticket'].search([('name', '=', 'Valid Inquiry')])),
|
||
2, 'One by creating alias, as no loop was detected'
|
||
)
|
||
|
||
# create 'looping' history by pre-creating messages on a thread -> should block future incoming emails
|
||
self.env['mail.message'].create([
|
||
{
|
||
'author_id': self.other_partner.id,
|
||
'model': test_updates[0]._name,
|
||
'res_id': test_updates[0].id,
|
||
} for x in range(4) # 4 + 1 posted before = 5 aka threshold
|
||
])
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
self.other_partner.email_formatted,
|
||
f'"Super Help" <{self.alias_ticket.alias_name}@{self.alias_ticket.alias_domain}>,'
|
||
f'{test_updates[0].alias_id.display_name}, {test_updates[1].alias_id.display_name}, '
|
||
f'{alias_gateway_group.display_name}, {alias_ticket_other.display_name}',
|
||
subject='Looping Inquiry',
|
||
return_path=self.other_partner.email_formatted,
|
||
target_model='mail.test.ticket',
|
||
)
|
||
self.assertFalse(
|
||
self.env['mail.test.ticket'].search([('name', '=', 'Looping Inquiry')]),
|
||
'Even if other routes are ok, one looping route is sufficient to block the incoming email'
|
||
)
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread')
|
||
def test_routing_loop_auto_notif(self):
|
||
""" Test Odoo servers talking to each other """
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
self.other_partner.email_formatted,
|
||
f'"Super Help" <{self.alias_ticket.alias_name}@{self.alias_ticket.alias_domain}>',
|
||
subject='Inquiry',
|
||
return_path=self.other_partner.email_formatted,
|
||
target_model='mail.test.ticket',
|
||
)
|
||
self.assertTrue(record)
|
||
self.assertEqual(record.message_partner_ids, self.other_partner)
|
||
|
||
for incoming_count in range(6): # threshold + 1
|
||
with self.mock_mail_gateway():
|
||
record.with_user(self.user_employee).message_post(
|
||
body='Automatic answer',
|
||
message_type='auto_comment',
|
||
subtype_xmlid='mail.mt_comment',
|
||
)
|
||
capture_messages = self.gateway_mail_reply_last_email(MAIL_TEMPLATE)
|
||
msg = capture_messages.records
|
||
self.assertTrue(msg)
|
||
# first messages are accepted -> post a message on record
|
||
if incoming_count < 4: # which makes 5 accepted messages
|
||
self.assertIn(msg, record.message_ids)
|
||
# other attempts triggers only a bounce
|
||
else:
|
||
self.assertFalse(msg.model)
|
||
self.assertFalse(msg.res_id)
|
||
self.assertIn('loop-detection-bounce-email', msg.mail_ids.references,
|
||
'Should be a msg linked to a bounce email with right header')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread')
|
||
def test_routing_loop_follower_alias(self):
|
||
""" Use case: managing follower that are aliases. """
|
||
with self.mock_mail_gateway():
|
||
record = self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
f'"Annoying Customer" <{self.customer_email}>',
|
||
f'"Super Help" <{self.alias_ticket.alias_name}@{self.alias_ticket.alias_domain}>',
|
||
cc=f'{self.alias_partner.email_normalized}, {self.other_partner.email_normalized}',
|
||
subject='Inquiry',
|
||
return_path=self.customer_email,
|
||
target_model='mail.test.ticket',
|
||
)
|
||
self.assertEqual(record.name, 'Inquiry')
|
||
self.assertFalse(record.message_partner_ids, 'Inquiry')
|
||
self.assertNotSentEmail()
|
||
self.assertEqual(record.message_ids.partner_ids, self.other_partner,
|
||
'MailGateway: recipients = alias should not be linked to message')
|
||
|
||
# for some stupid reason, people add an alias as follower
|
||
with self.mock_mail_gateway():
|
||
_message = record.with_user(self.user_employee).message_post(
|
||
body='Answer',
|
||
partner_ids=self.alias_partner.ids,
|
||
)
|
||
last_mail = self._mails # save to reuse
|
||
self.assertSentEmail(self.user_employee.email_formatted, [self.alias_partner.email_formatted])
|
||
|
||
# simulate this email coming back to the same Odoo server -> msg_id is
|
||
# a duplicate, hence rejected
|
||
with RecordCapturer(self.env['mail.test.ticket'], []) as capture_ticket, \
|
||
RecordCapturer(self.env['mail.test.gateway'], []) as capture_gateway:
|
||
self._reinject()
|
||
self.assertFalse(capture_ticket.records)
|
||
self.assertFalse(capture_gateway.records)
|
||
self.assertNotSentEmail()
|
||
self.assertFalse(bool(self._new_msgs))
|
||
|
||
# simulate stupid email providers that rewrites msg_id -> thanks to
|
||
# a custom header, it is rejected as already managed by mailgateway
|
||
self._mails = last_mail
|
||
with RecordCapturer(self.env['mail.test.ticket'], []) as capture_ticket, \
|
||
RecordCapturer(self.env['mail.test.gateway'], []) as capture_gateway:
|
||
self._reinject(force_msg_id='123donotnamemailjet456')
|
||
self.assertFalse(capture_ticket.records)
|
||
self.assertFalse(capture_gateway.records)
|
||
self.assertFalse(bool(self._new_msgs))
|
||
self.assertNotSentEmail()
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_mail', 'odoo.addons.mail.models.mail_thread')
|
||
def test_routing_loop_forward_catchall(self):
|
||
""" Use case: broad email forward to catchall. Example: customer sends an
|
||
email to catchall. It bounces: to=customer, return-path=bounce. Autoreply
|
||
replies to bounce: to=bounce. It is forwarded to catchall. It bounces,
|
||
and hop we have a loop. """
|
||
customer_email = "customer@test.example.com"
|
||
|
||
with self.mock_mail_gateway():
|
||
self.format_and_process(
|
||
MAIL_TEMPLATE,
|
||
f'"Annoying Customer" <{customer_email}>',
|
||
f'"No Reply" <{self.alias_catchall}@{self.alias_domain}>, Unroutable <unroutable@{self.alias_domain}>',
|
||
subject='Should Bounce (initial)',
|
||
return_path=customer_email,
|
||
)
|
||
self.assertSentEmail(
|
||
f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
[customer_email],
|
||
subject='Re: Should Bounce (initial)')
|
||
original_mail = self._mails
|
||
|
||
# auto-reply: write to bounce = no more bounce
|
||
self.gateway_mail_reply_last_email(MAIL_TEMPLATE, force_email_to=f'{self.alias_bounce}@{self.alias_domain}')
|
||
self.assertNotSentEmail()
|
||
|
||
# auto-reply but forwarded to catchall -> should not bounce again
|
||
self._mails = original_mail # just to revert state prior to auto reply
|
||
self.gateway_mail_reply_last_email(MAIL_TEMPLATE, force_email_to=f'{self.alias_catchall}@{self.alias_domain}')
|
||
# TDE FIXME: this should not bounce again
|
||
# self.assertNotSentEmail()
|
||
self.assertSentEmail(
|
||
f'"MAILER-DAEMON" <{self.alias_bounce}@{self.alias_domain}>',
|
||
[customer_email],
|
||
subject=f'Re: Re: Re: Should Bounce (initial)')
|
||
|
||
|
||
@tagged('mail_gateway', 'mail_thread')
|
||
class TestMailThreadCC(MailCommon):
|
||
|
||
@classmethod
|
||
def setUpClass(cls):
|
||
super(TestMailThreadCC, cls).setUpClass()
|
||
|
||
cls.email_from = 'Sylvie Lelitre <test.sylvie.lelitre@agrolait.com>'
|
||
cls.alias = cls.env['mail.alias'].create({
|
||
'alias_contact': 'everyone',
|
||
'alias_domain_id': cls.mail_alias_domain.id,
|
||
'alias_model_id': cls.env['ir.model']._get('mail.test.cc').id,
|
||
'alias_name': 'cc_record',
|
||
})
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_cc_new(self):
|
||
record = self.format_and_process(MAIL_TEMPLATE, self.email_from, f'cc_record@{self.alias_domain}',
|
||
cc='cc1@example.com, cc2@example.com', target_model='mail.test.cc')
|
||
cc = email_split_and_format(record.email_cc)
|
||
self.assertEqual(sorted(cc), ['cc1@example.com', 'cc2@example.com'])
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_cc_update_with_old(self):
|
||
record = self.env['mail.test.cc'].create({'email_cc': 'cc1 <cc1@example.com>, cc2@example.com'})
|
||
self.alias.write({'alias_force_thread_id': record.id})
|
||
|
||
self.format_and_process(MAIL_TEMPLATE, self.email_from, f'cc_record@{self.alias_domain}',
|
||
cc='cc2 <cc2@example.com>, cc3@example.com', target_model='mail.test.cc')
|
||
cc = email_split_and_format(record.email_cc)
|
||
self.assertEqual(sorted(cc), ['"cc1" <cc1@example.com>', 'cc2@example.com', 'cc3@example.com'], 'new cc should have been added on record (unique)')
|
||
|
||
@mute_logger('odoo.addons.mail.models.mail_thread')
|
||
def test_message_cc_update_no_old(self):
|
||
record = self.env['mail.test.cc'].create({})
|
||
self.alias.write({'alias_force_thread_id': record.id})
|
||
|
||
self.format_and_process(MAIL_TEMPLATE, self.email_from, f'cc_record@{self.alias_domain}',
|
||
cc='cc2 <cc2@example.com>, cc3@example.com', target_model='mail.test.cc')
|
||
cc = email_split_and_format(record.email_cc)
|
||
self.assertEqual(sorted(cc), ['"cc2" <cc2@example.com>', 'cc3@example.com'], 'new cc should have been added on record (unique)')
|