# -*- coding: utf-8 -*- from contextlib import contextmanager from freezegun import freeze_time from unittest.mock import patch from odoo import exceptions, tools from odoo.addons.mail.tests.common import MailCommon from odoo.addons.phone_validation.tools import phone_validation from odoo.addons.sms.models.sms_sms import SmsApi, SmsSms from odoo.tests import common class MockSMS(common.TransactionCase): def tearDown(self): super(MockSMS, self).tearDown() self._clear_sms_sent() # ------------------------------------------------------------ # UTILITY MOCKS # ------------------------------------------------------------ @contextmanager def mock_datetime_and_now(self, mock_dt): """ Used when synchronization date (using env.cr.now()) is important in addition to standard datetime mocks. Used mainly to detect sync issues. """ with freeze_time(mock_dt), \ patch.object(self.env.cr, 'now', lambda: mock_dt): yield # ------------------------------------------------------------ # GATEWAY MOCK # ------------------------------------------------------------ @contextmanager def mockSMSGateway(self, sms_allow_unlink=False, sim_error=None, nbr_t_error=None, moderated=False, force_delivered=False): self._clear_sms_sent() sms_create_origin = SmsSms.create sms_send_origin = SmsSms._send def _contact_iap(local_endpoint, params): # mock single sms sending if local_endpoint == '/iap/message_send': self._sms += [{ 'number': number, 'body': params['message'], } for number in params['numbers']] return True # send_message v0 API returns always True # mock batch sending if local_endpoint == '/iap/sms/2/send': result = [] for to_send in params['messages']: res = {'res_id': to_send['res_id'], 'state': 'delivered' if force_delivered else 'success', 'credit': 1} error = sim_error or (nbr_t_error and nbr_t_error.get(to_send['number'])) if error and error == 'credit': res.update(credit=0, state='insufficient_credit') elif error and error in {'wrong_number_format', 'unregistered', 'server_error'}: res.update(state=error) elif error and error == 'jsonrpc_exception': raise exceptions.AccessError( 'The url that this service requested returned an error. Please contact the author of the app. The url it tried to contact was ' + local_endpoint ) result.append(res) if res['state'] == 'success' or res['state'] == 'delivered': self._sms.append({ 'number': to_send['number'], 'body': to_send['content'], }) return result elif local_endpoint == '/api/sms/3/send': result = [] for message in params['messages']: for number in message["numbers"]: error = sim_error or (nbr_t_error and nbr_t_error.get(number['number'])) if error == 'jsonrpc_exception': raise exceptions.AccessError( 'The url that this service requested returned an error. ' 'Please contact the author of the app. ' 'The url it tried to contact was ' + local_endpoint ) elif error == 'credit': error = 'insufficient_credit' res = { 'uuid': number['uuid'], 'state': error or ('delivered' if force_delivered else 'success' if not moderated else 'processing'), 'credit': 1, } if error: # credit is only given if the amount is known res.update(credit=0) else: self._sms.append({ 'number': number['number'], 'body': message['content'], 'uuid': number['uuid'], }) result.append(res) return result def _sms_sms_create(model, *args, **kwargs): res = sms_create_origin(model, *args, **kwargs) self._new_sms += res.sudo() return res def _sms_sms_send(records, unlink_failed=False, unlink_sent=True, raise_exception=False): if sms_allow_unlink: return sms_send_origin(records, unlink_failed=unlink_failed, unlink_sent=unlink_sent, raise_exception=raise_exception) return sms_send_origin(records, unlink_failed=False, unlink_sent=False, raise_exception=raise_exception) try: with patch.object(SmsApi, '_contact_iap', side_effect=_contact_iap), \ patch.object(SmsSms, 'create', autospec=True, wraps=SmsSms, side_effect=_sms_sms_create), \ patch.object(SmsSms, '_send', autospec=True, wraps=SmsSms, side_effect=_sms_sms_send): yield finally: pass def _clear_sms_sent(self): self._sms = [] self._new_sms = self.env['sms.sms'].sudo() def _clear_outgoing_sms(self): """ As SMS gateway mock keeps SMS, we may need to remove them manually if there are several tests in the same tx. """ self.env['sms.sms'].sudo().search([('state', '=', 'outgoing')]).unlink() class SMSCase(MockSMS): """ Main test class to use when testing SMS integrations. Contains helpers and tools related to notification sent by SMS. """ @classmethod def setUpClass(cls): super().setUpClass() # This is called to make sure that an iap_account for sms already exists or if not is created. cls.env['iap.account'].get('sms') def _find_sms_sent(self, partner, number): if number is None and partner: number = partner._phone_format() sent_sms = next((sms for sms in self._sms if sms['number'] == number), None) if not sent_sms: raise AssertionError('sent sms not found for %s (number: %s)' % (partner, number)) return sent_sms def _find_sms_sms(self, partner, number, status, content=None): if number is None and partner: number = partner._phone_format() domain = [('id', 'in', self._new_sms.ids), ('partner_id', '=', partner.id), ('number', '=', number)] if status: domain += [('state', '=', status)] sms = self.env['sms.sms'].sudo().search(domain) if len(sms) > 1 and content: sms = sms.filtered(lambda s: content in (s.body or "")) if not sms: raise AssertionError('sms.sms not found for %s (number: %s / status %s)' % (partner, number, status)) if len(sms) > 1: raise NotImplementedError( f'Found {len(sms)} sms.sms for {partner} (number: {number} / status {status})' ) return sms def assertSMSIapSent(self, numbers, content=None): """ Check sent SMS. Order is not checked. Each number should have received the same content. Useful to check batch sending. :param numbers: list of numbers; :param content: content to check for each number; """ for number in numbers: sent_sms = next((sms for sms in self._sms if sms['number'] == number), None) self.assertTrue(bool(sent_sms), 'Number %s not found in %s' % (number, repr([s['number'] for s in self._sms]))) if content is not None: self.assertIn(content, sent_sms['body']) def assertSMS(self, partner, number, status, failure_type=None, content=None, fields_values=None): """ Find a ``sms.sms`` record, based on given partner, number and status. :param partner: optional partner, used to find a ``sms.sms`` and a number if not given; :param number: optional number, used to find a ``sms.sms``, notably if partner is not given; :param failure_type: check failure type if SMS is not sent or outgoing; :param content: if given, should be contained in sms body; :param fields_values: optional values allowing to check directly some values on ``sms.sms`` record; """ sms_sms = self._find_sms_sms(partner, number, status, content=content) if failure_type: self.assertEqual(sms_sms.failure_type, failure_type) if content is not None: self.assertIn(content, (sms_sms.body or "")) for fname, fvalue in (fields_values or {}).items(): self.assertEqual( sms_sms[fname], fvalue, 'SMS: expected %s for %s, got %s' % (fvalue, fname, sms_sms[fname])) if status == 'pending': self.assertSMSIapSent([sms_sms.number], content=content) def assertSMSCanceled(self, partner, number, failure_type, content=None, fields_values=None): """ Check canceled SMS. Search is done for a pair partner / number where partner can be an empty recordset. """ self.assertSMS(partner, number, 'canceled', failure_type=failure_type, content=content, fields_values=fields_values) def assertSMSFailed(self, partner, number, failure_type, content=None, fields_values=None): """ Check failed SMS. Search is done for a pair partner / number where partner can be an empty recordset. """ self.assertSMS(partner, number, 'error', failure_type=failure_type, content=content, fields_values=fields_values) def assertSMSOutgoing(self, partner, number, content=None, fields_values=None): """ Check outgoing SMS. Search is done for a pair partner / number where partner can be an empty recordset. """ self.assertSMS(partner, number, 'outgoing', content=content, fields_values=fields_values) def assertNoSMSNotification(self, messages=None): base_domain = [('notification_type', '=', 'sms')] if messages is not None: base_domain += [('mail_message_id', 'in', messages.ids)] self.assertEqual(self.env['mail.notification'].search(base_domain), self.env['mail.notification']) self.assertEqual(self._sms, []) def assertSMSNotification(self, recipients_info, content, messages=None, check_sms=True, sent_unlink=False, mail_message_values=None): """ Check content of notifications and sms. :param recipients_info: list[{ 'partner': res.partner record (may be empty), 'number': number used for notification (may be empty, computed based on partner), 'state': ready / pending / sent / exception / canceled (pending by default), 'failure_type': optional: sms_number_missing / sms_number_format / sms_credit / sms_server }, { ... }] :param content: SMS content :param mail_message_values: dictionary of expected mail message fields values """ partners = self.env['res.partner'].concat(*list(p['partner'] for p in recipients_info if p.get('partner'))) numbers = [p['number'] for p in recipients_info if p.get('number')] # special case of void notifications: check for False / False notifications if not partners and not numbers: numbers = [False] base_domain = [ '|', ('res_partner_id', 'in', partners.ids), '&', ('res_partner_id', '=', False), ('sms_number', 'in', numbers), ('notification_type', '=', 'sms') ] if messages is not None: base_domain += [('mail_message_id', 'in', messages.ids)] notifications = self.env['mail.notification'].search(base_domain) self.assertEqual(notifications.mapped('res_partner_id'), partners) for recipient_info in recipients_info: partner = recipient_info.get('partner', self.env['res.partner']) number = recipient_info.get('number') state = recipient_info.get('state', 'pending') if number is None and partner: number = partner._phone_format() notif = notifications.filtered(lambda n: n.res_partner_id == partner and n.sms_number == number and n.notification_status == state) debug_info = '' if not notif: debug_info = '\n'.join( f'To: {notif.sms_number} ({notif.res_partner_id}) - (State: {notif.notification_status})' for notif in notifications ) self.assertTrue(notif, 'SMS: not found notification for %s (number: %s, state: %s)\n%s' % (partner, number, state, debug_info)) self.assertEqual(notif.author_id, notif.mail_message_id.author_id, 'SMS: Message and notification should have the same author') for field_name, expected_value in (mail_message_values or {}).items(): self.assertEqual(notif.mail_message_id[field_name], expected_value) if state not in {'process', 'sent', 'ready', 'canceled', 'pending'}: self.assertEqual(notif.failure_type, recipient_info['failure_type']) if check_sms: if state in {'process', 'pending', 'sent'}: if sent_unlink: self.assertSMSIapSent([number], content=content) else: self.assertSMS(partner, number, state, content=content) elif state == 'ready': self.assertSMS(partner, number, 'outgoing', content=content) elif state == 'exception': self.assertSMS(partner, number, 'error', failure_type=recipient_info['failure_type'], content=content) elif state == 'canceled': self.assertSMS(partner, number, 'canceled', failure_type=recipient_info['failure_type'], content=content) else: raise NotImplementedError('Not implemented') if messages is not None: sanitize_tags = {**tools.mail.SANITIZE_TAGS} sanitize_tags['remove_tags'] = [*sanitize_tags['remove_tags'] + ['a']] with patch('odoo.tools.mail.SANITIZE_TAGS', sanitize_tags): for message in messages: self.assertEqual(content, tools.html2plaintext(tools.html_sanitize(message.body)).rstrip('\n')) def assertSMSLogged(self, records, body): for record in records: message = record.message_ids[-1] self.assertEqual(message.subtype_id, self.env.ref('mail.mt_note')) self.assertEqual(message.message_type, 'sms') self.assertEqual(tools.html2plaintext(message.body).rstrip('\n'), body) class SMSCommon(MailCommon, SMSCase): @classmethod def setUpClass(cls): super(SMSCommon, cls).setUpClass() cls.user_employee.write({'login': 'employee'}) # update country to belgium in order to test sanitization of numbers cls.user_employee.company_id.write({'country_id': cls.env.ref('base.be').id}) # some numbers for testing cls.random_numbers_str = '+32456998877, 0456665544' cls.random_numbers = cls.random_numbers_str.split(', ') cls.random_numbers_san = [phone_validation.phone_format(number, 'BE', '32', force_format='E164') for number in cls.random_numbers] cls.test_numbers = ['+32456010203', '0456 04 05 06', '0032456070809'] cls.test_numbers_san = [phone_validation.phone_format(number, 'BE', '32', force_format='E164') for number in cls.test_numbers] # some numbers for mass testing cls.mass_numbers = ['04561%s2%s3%s' % (x, x, x) for x in range(0, 10)] cls.mass_numbers_san = [phone_validation.phone_format(number, 'BE', '32', force_format='E164') for number in cls.mass_numbers] @classmethod def _create_sms_template(cls, model, body=False): return cls.env['sms.template'].create({ 'name': 'Test Template', 'model_id': cls.env['ir.model']._get(model).id, 'body': body if body else 'Dear {{ object.display_name }} this is an SMS.' }) def _make_webhook_jsonrpc_request(self, statuses): return self.make_jsonrpc_request('/sms/status', {'message_statuses': statuses})