427 lines
18 KiB
Python
427 lines
18 KiB
Python
import contextlib
|
|
import logging
|
|
import shutil
|
|
import smtplib
|
|
import socket
|
|
import ssl
|
|
import unittest
|
|
import warnings
|
|
from base64 import b64encode
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
from socket import getaddrinfo # keep a reference on the non-patched function
|
|
|
|
from odoo import modules
|
|
from odoo.exceptions import UserError
|
|
from odoo.tools import config, file_path, mute_logger
|
|
from .common import TransactionCaseWithUserDemo
|
|
|
|
try:
|
|
import aiosmtpd
|
|
import aiosmtpd.controller
|
|
import aiosmtpd.smtp
|
|
import aiosmtpd.handlers
|
|
except ImportError:
|
|
aiosmtpd = None
|
|
|
|
|
|
PASSWORD = 'secretpassword'
|
|
_openssl = shutil.which('openssl')
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _find_free_local_address():
|
|
""" Get a triple (family, address, port) on which it possible to bind
|
|
a local tcp service. """
|
|
addr = aiosmtpd.controller.get_localhost() # it returns 127.0.0.1 or ::1
|
|
family = socket.AF_INET if addr == '127.0.0.1' else socket.AF_INET6
|
|
with socket.socket(family, socket.SOCK_STREAM) as sock:
|
|
sock.bind((addr, 0))
|
|
port = sock.getsockname()[1]
|
|
return family, addr, port
|
|
|
|
|
|
def _smtp_authenticate(server, session, enveloppe, mechanism, data):
|
|
""" Callback method used by aiosmtpd to validate a login/password pair. """
|
|
result = aiosmtpd.smtp.AuthResult(success=data.password == PASSWORD.encode())
|
|
_logger.debug("AUTH %s", "successfull" if result.success else "failed")
|
|
return result
|
|
|
|
|
|
class Certificate:
|
|
def __init__(self, key, cert):
|
|
self.key = key and Path(file_path(key, filter_ext='.pem'))
|
|
self.cert = Path(file_path(cert, filter_ext='.pem'))
|
|
|
|
def __repr__(self):
|
|
return f"Certificate({self.key=}, {self.cert=})"
|
|
|
|
|
|
# skip when optional dependencies are not found
|
|
@unittest.skipUnless(aiosmtpd, "aiosmtpd couldn't be imported")
|
|
@unittest.skipUnless(_openssl, "openssl not found in path")
|
|
# fail fast for timeout errors
|
|
@patch('odoo.addons.base.models.ir_mail_server.SMTP_TIMEOUT', .1)
|
|
# prevent the CLI from interfering with the tests
|
|
@patch.dict(config.options, {'smtp_server': ''})
|
|
class TestIrMailServerSMTPD(TransactionCaseWithUserDemo):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
super().setUpClass()
|
|
|
|
# aiosmtpd emits deprecation warnings because it uses its own
|
|
# deprecated features, mute those logs.
|
|
# https://github.com/aio-libs/aiosmtpd/issues/347
|
|
class Session(aiosmtpd.smtp.Session):
|
|
@property
|
|
def login_data(self):
|
|
return self._login_data
|
|
@login_data.setter
|
|
def login_data(self, value):
|
|
self._login_data = value
|
|
patcher = patch('aiosmtpd.smtp.Session', Session)
|
|
patcher.start()
|
|
cls.addClassCleanup(patcher.stop)
|
|
|
|
# aiosmtpd emits warnings for some unusual configuration, like
|
|
# requiring AUTH on a clear-text transport. Mute those logs
|
|
# since we also test those unusual configurations.
|
|
warnings.filterwarnings(
|
|
'ignore',
|
|
"Requiring AUTH while not requiring TLS can lead to security vulnerabilities!",
|
|
category=UserWarning
|
|
)
|
|
class CustomFilter(logging.Filter):
|
|
def filter(self, record):
|
|
if record.msg == "auth_required == True but auth_require_tls == False":
|
|
return False
|
|
if record.msg == "tls_context.verify_mode not in {CERT_NONE, CERT_OPTIONAL}; this might cause client connection problems":
|
|
return False
|
|
return True
|
|
logging.getLogger('mail.log').addFilter(CustomFilter())
|
|
|
|
# decrease aiosmtpd verbosity, odoo INFO = aiosmtpd WARNING
|
|
logging.getLogger('mail.log').setLevel(_logger.getEffectiveLevel() + 10)
|
|
|
|
# Get various TLS keys and certificates. CA was used to sign
|
|
# both client and server. self_signed is... self signed.
|
|
cls.ssl_ca, cls.ssl_client, cls.ssl_server, cls.ssl_self_signed = [
|
|
Certificate(None, 'base/tests/ssl/ca.cert.pem'),
|
|
Certificate('base/tests/ssl/client.key.pem',
|
|
'base/tests/ssl/client.cert.pem'),
|
|
Certificate('base/tests/ssl/server.key.pem',
|
|
'base/tests/ssl/server.cert.pem'),
|
|
Certificate('base/tests/ssl/self_signed.key.pem',
|
|
'base/tests/ssl/self_signed.cert.pem'),
|
|
]
|
|
|
|
# Patch the two SMTP client classes into trusting the above CA
|
|
class TEST_SMTP(smtplib.SMTP):
|
|
def starttls(self, *, context):
|
|
if context is None:
|
|
context = ssl._create_stdlib_context() # what SMTP_SSL does
|
|
# context = ssl.create_default_context() # what it should do
|
|
context.load_verify_locations(cafile=str(cls.ssl_ca.cert))
|
|
super().starttls(context=context)
|
|
class TEST_SMTP_SSL(smtplib.SMTP_SSL):
|
|
def _get_socket(self, *args, **kwargs):
|
|
# self.context = ssl.create_default_context() # what it should do
|
|
self.context.load_verify_locations(cafile=str(cls.ssl_ca.cert))
|
|
return super()._get_socket(*args, **kwargs)
|
|
patcher = patch('smtplib.SMTP', TEST_SMTP)
|
|
patcher.start()
|
|
cls.addClassCleanup(patcher.stop)
|
|
patcher = patch('smtplib.SMTP_SSL', TEST_SMTP_SSL)
|
|
patcher.start()
|
|
cls.addClassCleanup(patcher.stop)
|
|
|
|
# reactivate sending emails during this test suite, make sure
|
|
# NOT TO send emails using another ir.mail_server than the one
|
|
# created in setUp!
|
|
patcher = patch.object(modules.module, 'current_test', False)
|
|
patcher.start()
|
|
cls.addClassCleanup(patcher.stop)
|
|
|
|
# fix runbot, docker uses a single ipv4 stack but it gives ::1
|
|
# when resolving "localhost" (so stupid), use the following to
|
|
# force aiosmtpd/odoo to bind/connect to a fixed ipv4 OR ipv6
|
|
# address.
|
|
family, _, cls.port = _find_free_local_address()
|
|
cls.localhost = getaddrinfo('localhost', cls.port, family)
|
|
cls.startClassPatcher(patch('socket.getaddrinfo', cls.getaddrinfo))
|
|
|
|
@classmethod
|
|
def getaddrinfo(cls, host, port, *args, **kwargs):
|
|
"""
|
|
Resolve both "localhost" and "notlocalhost" on the ip address
|
|
bound by aiosmtpd inside `start_smtpd`.
|
|
"""
|
|
if host in ('localhost', 'notlocalhost') and port == cls.port:
|
|
return cls.localhost
|
|
return getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)
|
|
|
|
@contextlib.contextmanager
|
|
def start_smtpd(
|
|
self, encryption, ssl_context=None, auth_required=True, stop_on_cleanup=True
|
|
):
|
|
"""
|
|
Start a smtp daemon in a background thread, stop it upon exiting
|
|
the context manager.
|
|
|
|
:param encryption: 'none', 'ssl' or 'starttls', the kind of
|
|
server to start.
|
|
:param ssl_context: the ``ssl.SSLContext`` object to use with
|
|
'ssl' or 'starttls'.
|
|
:param auth_required: whether the server enforces password
|
|
authentication or not.
|
|
"""
|
|
assert encryption in ('none', 'ssl', 'starttls')
|
|
assert encryption == 'none' or ssl_context
|
|
|
|
kwargs = {}
|
|
if encryption == 'starttls':
|
|
# for aiosmtpd.smtp.SMTP
|
|
kwargs.update({
|
|
'require_starttls': True,
|
|
'tls_context': ssl_context,
|
|
})
|
|
elif encryption == 'ssl':
|
|
# for aiosmtpd.controller.InetMixin
|
|
kwargs['ssl_context'] = ssl_context
|
|
if auth_required:
|
|
kwargs['authenticator'] = _smtp_authenticate
|
|
|
|
smtpd_thread = aiosmtpd.controller.Controller(
|
|
aiosmtpd.handlers.Debugging(),
|
|
hostname=aiosmtpd.controller.get_localhost(),
|
|
server_hostname='localhost',
|
|
port=self.port,
|
|
auth_required=auth_required,
|
|
auth_require_tls=False,
|
|
enable_SMTPUTF8=True,
|
|
**kwargs,
|
|
)
|
|
try:
|
|
smtpd_thread.start()
|
|
yield smtpd_thread
|
|
finally:
|
|
smtpd_thread.stop()
|
|
|
|
@mute_logger('mail.log')
|
|
def test_authentication_certificate_matrix(self):
|
|
"""
|
|
Connect to a server that is authenticating users via a TLS
|
|
certificate. Test the various possible configurations (missing
|
|
cert, invalid cert and valid cert) against both a STARTTLS and
|
|
a SSL/TLS SMTP server.
|
|
"""
|
|
mail_server = self.env['ir.mail_server'].create({
|
|
'name': 'test smtpd',
|
|
'from_filter': 'localhost',
|
|
'smtp_host': 'localhost',
|
|
'smtp_port': self.port,
|
|
'smtp_authentication': 'login',
|
|
'smtp_user': '',
|
|
'smtp_pass': '',
|
|
})
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ssl_context.load_cert_chain(self.ssl_server.cert, self.ssl_server.key)
|
|
ssl_context.load_verify_locations(cafile=self.ssl_ca.cert)
|
|
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
|
|
|
self_signed_key = b64encode(self.ssl_self_signed.key.read_bytes())
|
|
self_signed_cert = b64encode(self.ssl_self_signed.cert.read_bytes())
|
|
client_key = b64encode(self.ssl_client.key.read_bytes())
|
|
client_cert = b64encode(self.ssl_client.cert.read_bytes())
|
|
matrix = [
|
|
# authentication, name, certificate, private key, error pattern
|
|
('login', "missing", '', '',
|
|
r"The server has closed the connection unexpectedly\. "
|
|
r"Check configuration served on this port number\.\n "
|
|
r"Connection unexpectedly closed"),
|
|
('certificate', "self signed", self_signed_cert, self_signed_key,
|
|
r"The server has closed the connection unexpectedly\. "
|
|
r"Check configuration served on this port number\.\n "
|
|
r"Connection unexpectedly closed"),
|
|
('certificate', "valid client", client_cert, client_key, None),
|
|
]
|
|
|
|
for encryption in ('starttls', 'ssl'):
|
|
mail_server.smtp_encryption = encryption
|
|
with self.start_smtpd(encryption, ssl_context, auth_required=False):
|
|
for authentication, name, certificate, private_key, error_pattern in matrix:
|
|
with self.subTest(encryption=encryption, certificate=name):
|
|
mail_server.write({
|
|
'smtp_authentication': authentication,
|
|
'smtp_ssl_certificate': certificate,
|
|
'smtp_ssl_private_key': private_key,
|
|
})
|
|
if error_pattern:
|
|
with self.assertRaises(UserError) as error_capture:
|
|
mail_server.test_smtp_connection()
|
|
self.assertRegex(error_capture.exception.args[0], error_pattern)
|
|
else:
|
|
mail_server.test_smtp_connection()
|
|
|
|
|
|
def test_authentication_login_matrix(self):
|
|
"""
|
|
Connect to a server that is authenticating users via a login/pwd
|
|
pair. Test the various possible configurations (missing pair,
|
|
invalid pair and valid pair) against both a SMTP server without
|
|
encryption, a STARTTLS and a SSL/TLS SMTP server.
|
|
"""
|
|
mail_server = self.env['ir.mail_server'].create({
|
|
'name': 'test smtpd',
|
|
'from_filter': 'localhost',
|
|
'smtp_host': 'localhost',
|
|
'smtp_port': self.port,
|
|
'smtp_authentication': 'login',
|
|
'smtp_user': '',
|
|
'smtp_pass': '',
|
|
})
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ssl_context.load_cert_chain(self.ssl_server.cert, self.ssl_server.key)
|
|
|
|
MISSING = ''
|
|
INVALID = 'bad password'
|
|
matrix = [
|
|
# auth_required, password, error_pattern
|
|
(False, MISSING, None),
|
|
(True, MISSING,
|
|
r"The server refused the sender address \(noreply@localhost\) "
|
|
r"with error b'5\.7\.0 Authentication required'"),
|
|
(True, INVALID,
|
|
r"The server has closed the connection unexpectedly\. "
|
|
r"Check configuration served on this port number\.\n "
|
|
r"Connection unexpectedly closed:.* timed out"),
|
|
(True, PASSWORD, None),
|
|
]
|
|
|
|
for encryption in ('none', 'starttls', 'ssl'):
|
|
mail_server.smtp_encryption = encryption
|
|
for auth_required, password, error_pattern in matrix:
|
|
mail_server.smtp_user = password and self.user_demo.email
|
|
mail_server.smtp_pass = password
|
|
with self.subTest(encryption=encryption,
|
|
auth_required=auth_required,
|
|
password=password):
|
|
with self.start_smtpd(encryption, ssl_context, auth_required):
|
|
if error_pattern:
|
|
with self.assertRaises(UserError) as capture:
|
|
mail_server.test_smtp_connection()
|
|
self.assertRegex(capture.exception.args[0], error_pattern)
|
|
else:
|
|
mail_server.test_smtp_connection()
|
|
|
|
@mute_logger('mail.log')
|
|
def test_encryption_matrix(self):
|
|
"""
|
|
Connect to a server on a different encryption configuration than
|
|
the server is configured. Verify that it crashes with a good
|
|
error message.
|
|
"""
|
|
mail_server = self.env['ir.mail_server'].create({
|
|
'name': 'test smtpd',
|
|
'from_filter': 'localhost',
|
|
'smtp_host': 'localhost',
|
|
'smtp_port': self.port,
|
|
'smtp_authentication': 'login',
|
|
'smtp_user': '',
|
|
'smtp_pass': '',
|
|
})
|
|
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ssl_context.load_cert_chain(self.ssl_server.cert, self.ssl_server.key)
|
|
|
|
matrix = [
|
|
# client, server, error_pattern
|
|
('none', 'ssl',
|
|
r"The server has closed the connection unexpectedly\. "
|
|
r"Check configuration served on this port number\.\n "
|
|
r"Connection unexpectedly closed: timed out"),
|
|
('none', 'starttls',
|
|
r"The server refused the sender address \(noreply@localhost\) with error "
|
|
r"b'Must issue a STARTTLS command first'"),
|
|
('starttls', 'none',
|
|
r"An option is not supported by the server:\n "
|
|
r"STARTTLS extension not supported by server\."),
|
|
('starttls', 'ssl',
|
|
r"The server has closed the connection unexpectedly\. "
|
|
r"Check configuration served on this port number\.\n "
|
|
r"Connection unexpectedly closed: timed out"),
|
|
('ssl', 'none',
|
|
r"An SSL exception occurred\. "
|
|
r"Check connection security type\.\n "
|
|
r".*?wrong version number"),
|
|
('ssl', 'starttls',
|
|
r"An SSL exception occurred\. "
|
|
r"Check connection security type\.\n "
|
|
r".*?wrong version number"),
|
|
]
|
|
|
|
for client_encryption, server_encryption, error_pattern in matrix:
|
|
with self.subTest(server_encryption=server_encryption,
|
|
client_encryption=client_encryption):
|
|
mail_server.smtp_encryption = client_encryption
|
|
with self.start_smtpd(server_encryption, ssl_context, auth_required=False):
|
|
with self.assertRaises(UserError) as capture:
|
|
mail_server.test_smtp_connection()
|
|
self.assertRegex(capture.exception.args[0], error_pattern)
|
|
|
|
def test_man_in_the_middle_matrix(self):
|
|
"""
|
|
Simulate that a pirate was successful at intercepting the live
|
|
traffic in between the Odoo server and the legitimate SMTP
|
|
server.
|
|
"""
|
|
mail_server = self.env['ir.mail_server'].create({
|
|
'name': 'test smtpd',
|
|
'from_filter': 'localhost',
|
|
'smtp_host': 'localhost',
|
|
'smtp_port': self.port,
|
|
'smtp_authentication': 'login',
|
|
'smtp_user': self.user_demo.email,
|
|
'smtp_pass': PASSWORD,
|
|
'smtp_ssl_certificate': b64encode(self.ssl_client.cert.read_bytes()),
|
|
'smtp_ssl_private_key': b64encode(self.ssl_client.key.read_bytes()),
|
|
})
|
|
|
|
cert_good = self.ssl_server
|
|
cert_bad = self.ssl_self_signed
|
|
host_good = 'localhost'
|
|
host_bad = 'notlocalhost'
|
|
|
|
# for now it doesn't raise any error for bad cert/host
|
|
matrix = [
|
|
# authentication, certificate, hostname, error_pattern
|
|
('login', cert_bad, host_good, None),
|
|
('login', cert_good, host_bad, None),
|
|
('certificate', cert_bad, host_good, None),
|
|
('certificate', cert_good, host_bad, None),
|
|
]
|
|
|
|
for encryption in ('starttls', 'ssl'):
|
|
for authentication, certificate, hostname, error_pattern in matrix:
|
|
mail_server.smtp_host = hostname
|
|
mail_server.smtp_authentication = authentication
|
|
mail_server.smtp_encryption = encryption
|
|
with self.subTest(
|
|
encryption=encryption,
|
|
authentication=authentication,
|
|
cert_good=certificate == cert_good,
|
|
host_good=hostname == host_good,
|
|
):
|
|
mitm_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
mitm_context.load_cert_chain(certificate.cert, certificate.key)
|
|
auth_required = authentication == 'login'
|
|
with self.start_smtpd(encryption, mitm_context, auth_required):
|
|
if error_pattern:
|
|
with self.assertRaises(UserError) as capture:
|
|
mail_server.test_smtp_connection()
|
|
self.assertRegex(capture.exception.args[0], error_pattern)
|
|
else:
|
|
mail_server.test_smtp_connection()
|