Odoo18-Base/addons/sms/tests/common.py
2025-01-06 10:57:38 +07:00

339 lines
17 KiB
Python

# -*- coding: utf-8 -*-
from contextlib import contextmanager
from freezegun import freeze_time
from unittest.mock import patch
from odoo import exceptions, tools
from odoo.addons.mail.tests.common import MailCommon
from odoo.addons.phone_validation.tools import phone_validation
from odoo.addons.sms.models.sms_sms import SmsApi, SmsSms
from odoo.tests import common
class MockSMS(common.TransactionCase):
def tearDown(self):
super(MockSMS, self).tearDown()
self._clear_sms_sent()
# ------------------------------------------------------------
# UTILITY MOCKS
# ------------------------------------------------------------
@contextmanager
def mock_datetime_and_now(self, mock_dt):
""" Used when synchronization date (using env.cr.now()) is important
in addition to standard datetime mocks. Used mainly to detect sync
issues. """
with freeze_time(mock_dt), \
patch.object(self.env.cr, 'now', lambda: mock_dt):
yield
# ------------------------------------------------------------
# GATEWAY MOCK
# ------------------------------------------------------------
@contextmanager
def mockSMSGateway(self, sms_allow_unlink=False, sim_error=None, nbr_t_error=None, moderated=False, force_delivered=False):
self._clear_sms_sent()
sms_create_origin = SmsSms.create
sms_send_origin = SmsSms._send
def _contact_iap(local_endpoint, params):
# mock single sms sending
if local_endpoint == '/iap/message_send':
self._sms += [{
'number': number,
'body': params['message'],
} for number in params['numbers']]
return True # send_message v0 API returns always True
# mock batch sending
if local_endpoint == '/iap/sms/2/send':
result = []
for to_send in params['messages']:
res = {'res_id': to_send['res_id'], 'state': 'delivered' if force_delivered else 'success', 'credit': 1}
error = sim_error or (nbr_t_error and nbr_t_error.get(to_send['number']))
if error and error == 'credit':
res.update(credit=0, state='insufficient_credit')
elif error and error in {'wrong_number_format', 'unregistered', 'server_error'}:
res.update(state=error)
elif error and error == 'jsonrpc_exception':
raise exceptions.AccessError(
'The url that this service requested returned an error. Please contact the author of the app. The url it tried to contact was ' + local_endpoint
)
result.append(res)
if res['state'] == 'success' or res['state'] == 'delivered':
self._sms.append({
'number': to_send['number'],
'body': to_send['content'],
})
return result
elif local_endpoint == '/api/sms/3/send':
result = []
for message in params['messages']:
for number in message["numbers"]:
error = sim_error or (nbr_t_error and nbr_t_error.get(number['number']))
if error == 'jsonrpc_exception':
raise exceptions.AccessError(
'The url that this service requested returned an error. '
'Please contact the author of the app. '
'The url it tried to contact was ' + local_endpoint
)
elif error == 'credit':
error = 'insufficient_credit'
res = {
'uuid': number['uuid'],
'state': error or ('delivered' if force_delivered else 'success' if not moderated else 'processing'),
'credit': 1,
}
if error:
# credit is only given if the amount is known
res.update(credit=0)
else:
self._sms.append({
'number': number['number'],
'body': message['content'],
'uuid': number['uuid'],
})
result.append(res)
return result
def _sms_sms_create(model, *args, **kwargs):
res = sms_create_origin(model, *args, **kwargs)
self._new_sms += res.sudo()
return res
def _sms_sms_send(records, unlink_failed=False, unlink_sent=True, raise_exception=False):
if sms_allow_unlink:
return sms_send_origin(records, unlink_failed=unlink_failed, unlink_sent=unlink_sent, raise_exception=raise_exception)
return sms_send_origin(records, unlink_failed=False, unlink_sent=False, raise_exception=raise_exception)
try:
with patch.object(SmsApi, '_contact_iap', side_effect=_contact_iap), \
patch.object(SmsSms, 'create', autospec=True, wraps=SmsSms, side_effect=_sms_sms_create), \
patch.object(SmsSms, '_send', autospec=True, wraps=SmsSms, side_effect=_sms_sms_send):
yield
finally:
pass
def _clear_sms_sent(self):
self._sms = []
self._new_sms = self.env['sms.sms'].sudo()
def _clear_outgoing_sms(self):
""" As SMS gateway mock keeps SMS, we may need to remove them manually
if there are several tests in the same tx. """
self.env['sms.sms'].sudo().search([('state', '=', 'outgoing')]).unlink()
class SMSCase(MockSMS):
""" Main test class to use when testing SMS integrations. Contains helpers and tools related
to notification sent by SMS. """
@classmethod
def setUpClass(cls):
super().setUpClass()
# This is called to make sure that an iap_account for sms already exists or if not is created.
cls.env['iap.account'].get('sms')
def _find_sms_sent(self, partner, number):
if number is None and partner:
number = partner._phone_format()
sent_sms = next((sms for sms in self._sms if sms['number'] == number), None)
if not sent_sms:
raise AssertionError('sent sms not found for %s (number: %s)' % (partner, number))
return sent_sms
def _find_sms_sms(self, partner, number, status, content=None):
if number is None and partner:
number = partner._phone_format()
domain = [('id', 'in', self._new_sms.ids),
('partner_id', '=', partner.id),
('number', '=', number)]
if status:
domain += [('state', '=', status)]
sms = self.env['sms.sms'].sudo().search(domain)
if len(sms) > 1 and content:
sms = sms.filtered(lambda s: content in (s.body or ""))
if not sms:
raise AssertionError('sms.sms not found for %s (number: %s / status %s)' % (partner, number, status))
if len(sms) > 1:
raise NotImplementedError(
f'Found {len(sms)} sms.sms for {partner} (number: {number} / status {status})'
)
return sms
def assertSMSIapSent(self, numbers, content=None):
""" Check sent SMS. Order is not checked. Each number should have received
the same content. Useful to check batch sending.
:param numbers: list of numbers;
:param content: content to check for each number;
"""
for number in numbers:
sent_sms = next((sms for sms in self._sms if sms['number'] == number), None)
self.assertTrue(bool(sent_sms), 'Number %s not found in %s' % (number, repr([s['number'] for s in self._sms])))
if content is not None:
self.assertIn(content, sent_sms['body'])
def assertSMS(self, partner, number, status, failure_type=None,
content=None, fields_values=None):
""" Find a ``sms.sms`` record, based on given partner, number and status.
:param partner: optional partner, used to find a ``sms.sms`` and a number
if not given;
:param number: optional number, used to find a ``sms.sms``, notably if
partner is not given;
:param failure_type: check failure type if SMS is not sent or outgoing;
:param content: if given, should be contained in sms body;
:param fields_values: optional values allowing to check directly some
values on ``sms.sms`` record;
"""
sms_sms = self._find_sms_sms(partner, number, status, content=content)
if failure_type:
self.assertEqual(sms_sms.failure_type, failure_type)
if content is not None:
self.assertIn(content, (sms_sms.body or ""))
for fname, fvalue in (fields_values or {}).items():
self.assertEqual(
sms_sms[fname], fvalue,
'SMS: expected %s for %s, got %s' % (fvalue, fname, sms_sms[fname]))
if status == 'pending':
self.assertSMSIapSent([sms_sms.number], content=content)
def assertSMSCanceled(self, partner, number, failure_type, content=None, fields_values=None):
""" Check canceled SMS. Search is done for a pair partner / number where
partner can be an empty recordset. """
self.assertSMS(partner, number, 'canceled', failure_type=failure_type, content=content, fields_values=fields_values)
def assertSMSFailed(self, partner, number, failure_type, content=None, fields_values=None):
""" Check failed SMS. Search is done for a pair partner / number where
partner can be an empty recordset. """
self.assertSMS(partner, number, 'error', failure_type=failure_type, content=content, fields_values=fields_values)
def assertSMSOutgoing(self, partner, number, content=None, fields_values=None):
""" Check outgoing SMS. Search is done for a pair partner / number where
partner can be an empty recordset. """
self.assertSMS(partner, number, 'outgoing', content=content, fields_values=fields_values)
def assertNoSMSNotification(self, messages=None):
base_domain = [('notification_type', '=', 'sms')]
if messages is not None:
base_domain += [('mail_message_id', 'in', messages.ids)]
self.assertEqual(self.env['mail.notification'].search(base_domain), self.env['mail.notification'])
self.assertEqual(self._sms, [])
def assertSMSNotification(self, recipients_info, content, messages=None, check_sms=True, sent_unlink=False,
mail_message_values=None):
""" Check content of notifications and sms.
:param recipients_info: list[{
'partner': res.partner record (may be empty),
'number': number used for notification (may be empty, computed based on partner),
'state': ready / pending / sent / exception / canceled (pending by default),
'failure_type': optional: sms_number_missing / sms_number_format / sms_credit / sms_server
}, { ... }]
:param content: SMS content
:param mail_message_values: dictionary of expected mail message fields values
"""
partners = self.env['res.partner'].concat(*list(p['partner'] for p in recipients_info if p.get('partner')))
numbers = [p['number'] for p in recipients_info if p.get('number')]
# special case of void notifications: check for False / False notifications
if not partners and not numbers:
numbers = [False]
base_domain = [
'|', ('res_partner_id', 'in', partners.ids),
'&', ('res_partner_id', '=', False), ('sms_number', 'in', numbers),
('notification_type', '=', 'sms')
]
if messages is not None:
base_domain += [('mail_message_id', 'in', messages.ids)]
notifications = self.env['mail.notification'].search(base_domain)
self.assertEqual(notifications.mapped('res_partner_id'), partners)
for recipient_info in recipients_info:
partner = recipient_info.get('partner', self.env['res.partner'])
number = recipient_info.get('number')
state = recipient_info.get('state', 'pending')
if number is None and partner:
number = partner._phone_format()
notif = notifications.filtered(lambda n: n.res_partner_id == partner and n.sms_number == number and n.notification_status == state)
debug_info = ''
if not notif:
debug_info = '\n'.join(
f'To: {notif.sms_number} ({notif.res_partner_id}) - (State: {notif.notification_status})'
for notif in notifications
)
self.assertTrue(notif, 'SMS: not found notification for %s (number: %s, state: %s)\n%s' % (partner, number, state, debug_info))
self.assertEqual(notif.author_id, notif.mail_message_id.author_id, 'SMS: Message and notification should have the same author')
for field_name, expected_value in (mail_message_values or {}).items():
self.assertEqual(notif.mail_message_id[field_name], expected_value)
if state not in {'process', 'sent', 'ready', 'canceled', 'pending'}:
self.assertEqual(notif.failure_type, recipient_info['failure_type'])
if check_sms:
if state in {'process', 'pending', 'sent'}:
if sent_unlink:
self.assertSMSIapSent([number], content=content)
else:
self.assertSMS(partner, number, state, content=content)
elif state == 'ready':
self.assertSMS(partner, number, 'outgoing', content=content)
elif state == 'exception':
self.assertSMS(partner, number, 'error', failure_type=recipient_info['failure_type'], content=content)
elif state == 'canceled':
self.assertSMS(partner, number, 'canceled', failure_type=recipient_info['failure_type'], content=content)
else:
raise NotImplementedError('Not implemented')
if messages is not None:
sanitize_tags = {**tools.mail.SANITIZE_TAGS}
sanitize_tags['remove_tags'] = [*sanitize_tags['remove_tags'] + ['a']]
with patch('odoo.tools.mail.SANITIZE_TAGS', sanitize_tags):
for message in messages:
self.assertEqual(content, tools.html2plaintext(tools.html_sanitize(message.body)).rstrip('\n'))
def assertSMSLogged(self, records, body):
for record in records:
message = record.message_ids[-1]
self.assertEqual(message.subtype_id, self.env.ref('mail.mt_note'))
self.assertEqual(message.message_type, 'sms')
self.assertEqual(tools.html2plaintext(message.body).rstrip('\n'), body)
class SMSCommon(MailCommon, SMSCase):
@classmethod
def setUpClass(cls):
super(SMSCommon, cls).setUpClass()
cls.user_employee.write({'login': 'employee'})
# update country to belgium in order to test sanitization of numbers
cls.user_employee.company_id.write({'country_id': cls.env.ref('base.be').id})
# some numbers for testing
cls.random_numbers_str = '+32456998877, 0456665544'
cls.random_numbers = cls.random_numbers_str.split(', ')
cls.random_numbers_san = [phone_validation.phone_format(number, 'BE', '32', force_format='E164') for number in cls.random_numbers]
cls.test_numbers = ['+32456010203', '0456 04 05 06', '0032456070809']
cls.test_numbers_san = [phone_validation.phone_format(number, 'BE', '32', force_format='E164') for number in cls.test_numbers]
# some numbers for mass testing
cls.mass_numbers = ['04561%s2%s3%s' % (x, x, x) for x in range(0, 10)]
cls.mass_numbers_san = [phone_validation.phone_format(number, 'BE', '32', force_format='E164') for number in cls.mass_numbers]
@classmethod
def _create_sms_template(cls, model, body=False):
return cls.env['sms.template'].create({
'name': 'Test Template',
'model_id': cls.env['ir.model']._get(model).id,
'body': body if body else 'Dear {{ object.display_name }} this is an SMS.'
})
def _make_webhook_jsonrpc_request(self, statuses):
return self.make_jsonrpc_request('/sms/status', {'message_statuses': statuses})