Odoo18-Base/odoo/addons/base/tests/test_ir_mail_server_smtpd.py
2025-01-06 10:57:38 +07:00

427 lines
18 KiB
Python

import contextlib
import logging
import shutil
import smtplib
import socket
import ssl
import unittest
import warnings
from base64 import b64encode
from pathlib import Path
from unittest.mock import patch
from socket import getaddrinfo # keep a reference on the non-patched function
from odoo import modules
from odoo.exceptions import UserError
from odoo.tools import config, file_path, mute_logger
from .common import TransactionCaseWithUserDemo
try:
import aiosmtpd
import aiosmtpd.controller
import aiosmtpd.smtp
import aiosmtpd.handlers
except ImportError:
aiosmtpd = None
PASSWORD = 'secretpassword'
_openssl = shutil.which('openssl')
_logger = logging.getLogger(__name__)
def _find_free_local_address():
""" Get a triple (family, address, port) on which it possible to bind
a local tcp service. """
addr = aiosmtpd.controller.get_localhost() # it returns 127.0.0.1 or ::1
family = socket.AF_INET if addr == '127.0.0.1' else socket.AF_INET6
with socket.socket(family, socket.SOCK_STREAM) as sock:
sock.bind((addr, 0))
port = sock.getsockname()[1]
return family, addr, port
def _smtp_authenticate(server, session, enveloppe, mechanism, data):
""" Callback method used by aiosmtpd to validate a login/password pair. """
result = aiosmtpd.smtp.AuthResult(success=data.password == PASSWORD.encode())
_logger.debug("AUTH %s", "successfull" if result.success else "failed")
return result
class Certificate:
def __init__(self, key, cert):
self.key = key and Path(file_path(key, filter_ext='.pem'))
self.cert = Path(file_path(cert, filter_ext='.pem'))
def __repr__(self):
return f"Certificate({self.key=}, {self.cert=})"
# skip when optional dependencies are not found
@unittest.skipUnless(aiosmtpd, "aiosmtpd couldn't be imported")
@unittest.skipUnless(_openssl, "openssl not found in path")
# fail fast for timeout errors
@patch('odoo.addons.base.models.ir_mail_server.SMTP_TIMEOUT', .1)
# prevent the CLI from interfering with the tests
@patch.dict(config.options, {'smtp_server': ''})
class TestIrMailServerSMTPD(TransactionCaseWithUserDemo):
@classmethod
def setUpClass(cls):
super().setUpClass()
# aiosmtpd emits deprecation warnings because it uses its own
# deprecated features, mute those logs.
# https://github.com/aio-libs/aiosmtpd/issues/347
class Session(aiosmtpd.smtp.Session):
@property
def login_data(self):
return self._login_data
@login_data.setter
def login_data(self, value):
self._login_data = value
patcher = patch('aiosmtpd.smtp.Session', Session)
patcher.start()
cls.addClassCleanup(patcher.stop)
# aiosmtpd emits warnings for some unusual configuration, like
# requiring AUTH on a clear-text transport. Mute those logs
# since we also test those unusual configurations.
warnings.filterwarnings(
'ignore',
"Requiring AUTH while not requiring TLS can lead to security vulnerabilities!",
category=UserWarning
)
class CustomFilter(logging.Filter):
def filter(self, record):
if record.msg == "auth_required == True but auth_require_tls == False":
return False
if record.msg == "tls_context.verify_mode not in {CERT_NONE, CERT_OPTIONAL}; this might cause client connection problems":
return False
return True
logging.getLogger('mail.log').addFilter(CustomFilter())
# decrease aiosmtpd verbosity, odoo INFO = aiosmtpd WARNING
logging.getLogger('mail.log').setLevel(_logger.getEffectiveLevel() + 10)
# Get various TLS keys and certificates. CA was used to sign
# both client and server. self_signed is... self signed.
cls.ssl_ca, cls.ssl_client, cls.ssl_server, cls.ssl_self_signed = [
Certificate(None, 'base/tests/ssl/ca.cert.pem'),
Certificate('base/tests/ssl/client.key.pem',
'base/tests/ssl/client.cert.pem'),
Certificate('base/tests/ssl/server.key.pem',
'base/tests/ssl/server.cert.pem'),
Certificate('base/tests/ssl/self_signed.key.pem',
'base/tests/ssl/self_signed.cert.pem'),
]
# Patch the two SMTP client classes into trusting the above CA
class TEST_SMTP(smtplib.SMTP):
def starttls(self, *, context):
if context is None:
context = ssl._create_stdlib_context() # what SMTP_SSL does
# context = ssl.create_default_context() # what it should do
context.load_verify_locations(cafile=str(cls.ssl_ca.cert))
super().starttls(context=context)
class TEST_SMTP_SSL(smtplib.SMTP_SSL):
def _get_socket(self, *args, **kwargs):
# self.context = ssl.create_default_context() # what it should do
self.context.load_verify_locations(cafile=str(cls.ssl_ca.cert))
return super()._get_socket(*args, **kwargs)
patcher = patch('smtplib.SMTP', TEST_SMTP)
patcher.start()
cls.addClassCleanup(patcher.stop)
patcher = patch('smtplib.SMTP_SSL', TEST_SMTP_SSL)
patcher.start()
cls.addClassCleanup(patcher.stop)
# reactivate sending emails during this test suite, make sure
# NOT TO send emails using another ir.mail_server than the one
# created in setUp!
patcher = patch.object(modules.module, 'current_test', False)
patcher.start()
cls.addClassCleanup(patcher.stop)
# fix runbot, docker uses a single ipv4 stack but it gives ::1
# when resolving "localhost" (so stupid), use the following to
# force aiosmtpd/odoo to bind/connect to a fixed ipv4 OR ipv6
# address.
family, _, cls.port = _find_free_local_address()
cls.localhost = getaddrinfo('localhost', cls.port, family)
cls.startClassPatcher(patch('socket.getaddrinfo', cls.getaddrinfo))
@classmethod
def getaddrinfo(cls, host, port, *args, **kwargs):
"""
Resolve both "localhost" and "notlocalhost" on the ip address
bound by aiosmtpd inside `start_smtpd`.
"""
if host in ('localhost', 'notlocalhost') and port == cls.port:
return cls.localhost
return getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)
@contextlib.contextmanager
def start_smtpd(
self, encryption, ssl_context=None, auth_required=True, stop_on_cleanup=True
):
"""
Start a smtp daemon in a background thread, stop it upon exiting
the context manager.
:param encryption: 'none', 'ssl' or 'starttls', the kind of
server to start.
:param ssl_context: the ``ssl.SSLContext`` object to use with
'ssl' or 'starttls'.
:param auth_required: whether the server enforces password
authentication or not.
"""
assert encryption in ('none', 'ssl', 'starttls')
assert encryption == 'none' or ssl_context
kwargs = {}
if encryption == 'starttls':
# for aiosmtpd.smtp.SMTP
kwargs.update({
'require_starttls': True,
'tls_context': ssl_context,
})
elif encryption == 'ssl':
# for aiosmtpd.controller.InetMixin
kwargs['ssl_context'] = ssl_context
if auth_required:
kwargs['authenticator'] = _smtp_authenticate
smtpd_thread = aiosmtpd.controller.Controller(
aiosmtpd.handlers.Debugging(),
hostname=aiosmtpd.controller.get_localhost(),
server_hostname='localhost',
port=self.port,
auth_required=auth_required,
auth_require_tls=False,
enable_SMTPUTF8=True,
**kwargs,
)
try:
smtpd_thread.start()
yield smtpd_thread
finally:
smtpd_thread.stop()
@mute_logger('mail.log')
def test_authentication_certificate_matrix(self):
"""
Connect to a server that is authenticating users via a TLS
certificate. Test the various possible configurations (missing
cert, invalid cert and valid cert) against both a STARTTLS and
a SSL/TLS SMTP server.
"""
mail_server = self.env['ir.mail_server'].create({
'name': 'test smtpd',
'from_filter': 'localhost',
'smtp_host': 'localhost',
'smtp_port': self.port,
'smtp_authentication': 'login',
'smtp_user': '',
'smtp_pass': '',
})
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(self.ssl_server.cert, self.ssl_server.key)
ssl_context.load_verify_locations(cafile=self.ssl_ca.cert)
ssl_context.verify_mode = ssl.CERT_REQUIRED
self_signed_key = b64encode(self.ssl_self_signed.key.read_bytes())
self_signed_cert = b64encode(self.ssl_self_signed.cert.read_bytes())
client_key = b64encode(self.ssl_client.key.read_bytes())
client_cert = b64encode(self.ssl_client.cert.read_bytes())
matrix = [
# authentication, name, certificate, private key, error pattern
('login', "missing", '', '',
r"The server has closed the connection unexpectedly\. "
r"Check configuration served on this port number\.\n "
r"Connection unexpectedly closed"),
('certificate', "self signed", self_signed_cert, self_signed_key,
r"The server has closed the connection unexpectedly\. "
r"Check configuration served on this port number\.\n "
r"Connection unexpectedly closed"),
('certificate', "valid client", client_cert, client_key, None),
]
for encryption in ('starttls', 'ssl'):
mail_server.smtp_encryption = encryption
with self.start_smtpd(encryption, ssl_context, auth_required=False):
for authentication, name, certificate, private_key, error_pattern in matrix:
with self.subTest(encryption=encryption, certificate=name):
mail_server.write({
'smtp_authentication': authentication,
'smtp_ssl_certificate': certificate,
'smtp_ssl_private_key': private_key,
})
if error_pattern:
with self.assertRaises(UserError) as error_capture:
mail_server.test_smtp_connection()
self.assertRegex(error_capture.exception.args[0], error_pattern)
else:
mail_server.test_smtp_connection()
def test_authentication_login_matrix(self):
"""
Connect to a server that is authenticating users via a login/pwd
pair. Test the various possible configurations (missing pair,
invalid pair and valid pair) against both a SMTP server without
encryption, a STARTTLS and a SSL/TLS SMTP server.
"""
mail_server = self.env['ir.mail_server'].create({
'name': 'test smtpd',
'from_filter': 'localhost',
'smtp_host': 'localhost',
'smtp_port': self.port,
'smtp_authentication': 'login',
'smtp_user': '',
'smtp_pass': '',
})
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(self.ssl_server.cert, self.ssl_server.key)
MISSING = ''
INVALID = 'bad password'
matrix = [
# auth_required, password, error_pattern
(False, MISSING, None),
(True, MISSING,
r"The server refused the sender address \(noreply@localhost\) "
r"with error b'5\.7\.0 Authentication required'"),
(True, INVALID,
r"The server has closed the connection unexpectedly\. "
r"Check configuration served on this port number\.\n "
r"Connection unexpectedly closed:.* timed out"),
(True, PASSWORD, None),
]
for encryption in ('none', 'starttls', 'ssl'):
mail_server.smtp_encryption = encryption
for auth_required, password, error_pattern in matrix:
mail_server.smtp_user = password and self.user_demo.email
mail_server.smtp_pass = password
with self.subTest(encryption=encryption,
auth_required=auth_required,
password=password):
with self.start_smtpd(encryption, ssl_context, auth_required):
if error_pattern:
with self.assertRaises(UserError) as capture:
mail_server.test_smtp_connection()
self.assertRegex(capture.exception.args[0], error_pattern)
else:
mail_server.test_smtp_connection()
@mute_logger('mail.log')
def test_encryption_matrix(self):
"""
Connect to a server on a different encryption configuration than
the server is configured. Verify that it crashes with a good
error message.
"""
mail_server = self.env['ir.mail_server'].create({
'name': 'test smtpd',
'from_filter': 'localhost',
'smtp_host': 'localhost',
'smtp_port': self.port,
'smtp_authentication': 'login',
'smtp_user': '',
'smtp_pass': '',
})
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(self.ssl_server.cert, self.ssl_server.key)
matrix = [
# client, server, error_pattern
('none', 'ssl',
r"The server has closed the connection unexpectedly\. "
r"Check configuration served on this port number\.\n "
r"Connection unexpectedly closed: timed out"),
('none', 'starttls',
r"The server refused the sender address \(noreply@localhost\) with error "
r"b'Must issue a STARTTLS command first'"),
('starttls', 'none',
r"An option is not supported by the server:\n "
r"STARTTLS extension not supported by server\."),
('starttls', 'ssl',
r"The server has closed the connection unexpectedly\. "
r"Check configuration served on this port number\.\n "
r"Connection unexpectedly closed: timed out"),
('ssl', 'none',
r"An SSL exception occurred\. "
r"Check connection security type\.\n "
r".*?wrong version number"),
('ssl', 'starttls',
r"An SSL exception occurred\. "
r"Check connection security type\.\n "
r".*?wrong version number"),
]
for client_encryption, server_encryption, error_pattern in matrix:
with self.subTest(server_encryption=server_encryption,
client_encryption=client_encryption):
mail_server.smtp_encryption = client_encryption
with self.start_smtpd(server_encryption, ssl_context, auth_required=False):
with self.assertRaises(UserError) as capture:
mail_server.test_smtp_connection()
self.assertRegex(capture.exception.args[0], error_pattern)
def test_man_in_the_middle_matrix(self):
"""
Simulate that a pirate was successful at intercepting the live
traffic in between the Odoo server and the legitimate SMTP
server.
"""
mail_server = self.env['ir.mail_server'].create({
'name': 'test smtpd',
'from_filter': 'localhost',
'smtp_host': 'localhost',
'smtp_port': self.port,
'smtp_authentication': 'login',
'smtp_user': self.user_demo.email,
'smtp_pass': PASSWORD,
'smtp_ssl_certificate': b64encode(self.ssl_client.cert.read_bytes()),
'smtp_ssl_private_key': b64encode(self.ssl_client.key.read_bytes()),
})
cert_good = self.ssl_server
cert_bad = self.ssl_self_signed
host_good = 'localhost'
host_bad = 'notlocalhost'
# for now it doesn't raise any error for bad cert/host
matrix = [
# authentication, certificate, hostname, error_pattern
('login', cert_bad, host_good, None),
('login', cert_good, host_bad, None),
('certificate', cert_bad, host_good, None),
('certificate', cert_good, host_bad, None),
]
for encryption in ('starttls', 'ssl'):
for authentication, certificate, hostname, error_pattern in matrix:
mail_server.smtp_host = hostname
mail_server.smtp_authentication = authentication
mail_server.smtp_encryption = encryption
with self.subTest(
encryption=encryption,
authentication=authentication,
cert_good=certificate == cert_good,
host_good=hostname == host_good,
):
mitm_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
mitm_context.load_cert_chain(certificate.cert, certificate.key)
auth_required = authentication == 'login'
with self.start_smtpd(encryption, mitm_context, auth_required):
if error_pattern:
with self.assertRaises(UserError) as capture:
mail_server.test_smtp_connection()
self.assertRegex(capture.exception.args[0], error_pattern)
else:
mail_server.test_smtp_connection()