# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
from urllib.parse import urljoin
from odoo import api, fields, models, _
from odoo.addons.link_tracker.models.link_tracker import LINK_TRACKER_MIN_CODE_LENGTH
from odoo.exceptions import UserError
from odoo.osv import expression
_logger = logging.getLogger(__name__)
class Mailing(models.Model):
_inherit = 'mailing.mailing'
def default_get(self, fields):
res = super(Mailing, self).default_get(fields)
if fields is not None and 'keep_archives' in fields and res.get('mailing_type') == 'sms':
res['keep_archives'] = True
return res
# mailing options
mailing_type = fields.Selection(selection_add=[
('sms', 'SMS')
], ondelete={'sms': 'set default'})
# 'sms_subject' added to override 'subject' field (string attribute should be labelled "Title" when mailing_type == 'sms').
# 'sms_subject' should have the same helper as 'subject' field when 'mass_mailing_sms' installed.
# otherwise 'sms_subject' will get the old helper from 'mass_mailing' module.
# overriding 'subject' field helper in this model is not working, since the helper will keep the new value
# even when 'mass_mailing_sms' removed (see 'mailing_mailing_view_form_sms' for more details).
sms_subject = fields.Char(
'Title', related='subject',
readonly=False, translate=False,
help='For an email, the subject your recipients will see in their inbox.\n'
'For an SMS, the internal title of the message.')
# sms options
body_plaintext = fields.Text(
'SMS Body', compute='_compute_body_plaintext',
store=True, readonly=False)
sms_template_id = fields.Many2one('sms.template', string='SMS Template', ondelete='set null')
sms_has_insufficient_credit = fields.Boolean(
'Insufficient IAP credits', compute='_compute_sms_has_iap_failure') # used to propose buying IAP credits
sms_has_unregistered_account = fields.Boolean(
'Unregistered IAP account', compute='_compute_sms_has_iap_failure') # used to propose to Register the SMS IAP account
sms_force_send = fields.Boolean(
'Send Directly', help='Immediately send the SMS Mailing instead of queuing up. Use at your own risk.')
# opt_out_link
sms_allow_unsubscribe = fields.Boolean('Include opt-out link', default=False)
# A/B Testing
ab_testing_sms_winner_selection = fields.Selection(
default="clicks_ratio", readonly=False, copy=True)
ab_testing_mailings_sms_count = fields.Integer(related="campaign_id.ab_testing_mailings_sms_count")
def _compute_medium_id(self):
super(Mailing, self)._compute_medium_id()
for mailing in self:
if mailing.mailing_type == 'sms' and (not mailing.medium_id or mailing.medium_id == self.env['utm.medium']._fetch_or_create_utm_medium('email')):
mailing.medium_id = self.env['utm.medium']._fetch_or_create_utm_medium("sms", module="mass_mailing_sms").id
elif mailing.mailing_type == 'mail' and (not mailing.medium_id or mailing.medium_id == self.env['utm.medium']._fetch_or_create_utm_medium("sms", module="mass_mailing_sms")):
mailing.medium_id = self.env['utm.medium']._fetch_or_create_utm_medium('email').id
@api.depends('sms_template_id', 'mailing_type')
def _compute_body_plaintext(self):
for mailing in self:
if mailing.mailing_type == 'sms' and mailing.sms_template_id:
mailing.body_plaintext = mailing.sms_template_id.body
def _compute_sms_has_iap_failure(self):
self.sms_has_insufficient_credit = self.sms_has_unregistered_account = False
traces = self.env['mailing.trace'].sudo()._read_group([
('mass_mailing_id', 'in', self.ids),
('trace_type', '=', 'sms'),
('failure_type', 'in', ['sms_acc', 'sms_credit'])
], ['mass_mailing_id', 'failure_type'])
for mass_mailing, failure_type in traces:
if failure_type == 'sms_credit':
mass_mailing.sms_has_insufficient_credit = True
elif failure_type == 'sms_acc':
mass_mailing.sms_has_unregistered_account = True
# --------------------------------------------------
# --------------------------------------------------
def create(self, vals_list):
for vals in vals_list:
# Get subject from "sms_subject" field when SMS installed (used to
# build the name of record in the super 'create' method)
if vals.get('mailing_type') == 'sms' and vals.get('sms_subject'):
vals['subject'] = vals['sms_subject']
return super().create(vals_list)
# --------------------------------------------------
# --------------------------------------------------
def action_retry_failed(self):
mass_sms = self.filtered(lambda m: m.mailing_type == 'sms')
if mass_sms:
return super(Mailing, self - mass_sms).action_retry_failed()
def action_retry_failed_sms(self):
failed_sms = self.env['sms.sms'].sudo().search([
('mailing_id', 'in', self.ids),
('state', '=', 'error')
def action_test(self):
if self.mailing_type == 'sms':
ctx = dict(self.env.context, default_mailing_id=self.id, dialog_size='medium')
return {
'name': _('Test Mailing'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'mailing.sms.test',
'target': 'new',
'context': ctx,
return super(Mailing, self).action_test()
def _action_view_traces_filtered(self, view_filter):
action = super(Mailing, self)._action_view_traces_filtered(view_filter)
if self.mailing_type == 'sms':
action['views'] = [(self.env.ref('mass_mailing_sms.mailing_trace_view_tree_sms').id, 'list'),
(self.env.ref('mass_mailing_sms.mailing_trace_view_form_sms').id, 'form')]
return action
def action_buy_sms_credits(self):
url = self.env['iap.account'].get_credits_url(service_name='sms')
return {
'type': 'ir.actions.act_url',
'url': url,
# --------------------------------------------------
# --------------------------------------------------
def _get_opt_out_list_sms(self):
""" Give list of opt-outed records, depending on specific model-based
computation if available.
:return list: opt-outed record IDs
opt_out = []
target = self.env[self.mailing_model_real]
if hasattr(self.env[self.mailing_model_name], '_mailing_get_opt_out_list_sms'):
opt_out = self.env[self.mailing_model_name]._mailing_get_opt_out_list_sms(self)
_logger.info("Mass SMS %s targets %s: optout: %s contacts", self, target._name, len(opt_out))
_logger.info("Mass SMS %s targets %s: no opt out list available", self, target._name)
return opt_out
def _get_seen_list_sms(self):
"""Returns a set of emails already targeted by current mailing/campaign (no duplicates)"""
target = self.env[self.mailing_model_real]
partner_fields = []
if isinstance(target, self.pool['mail.thread.phone']):
phone_fields = ['phone_sanitized']
phone_fields = [
fname for fname in target._phone_get_number_fields()
if fname in target._fields and target._fields[fname].store
partner_fields = target._mail_get_partner_fields()
partner_field = next(
(fname for fname in partner_fields if target._fields[fname].store and target._fields[fname].type == 'many2one'),
if not phone_fields and not partner_field:
raise UserError(_("Unsupported %s for mass SMS", self.mailing_model_id.name))
query = """
SELECT %(select_query)s
FROM mailing_trace trace
JOIN %(target_table)s target ON (trace.res_id = target.id)
WHERE (%(where_query)s)
AND trace.mass_mailing_id = %%(mailing_id)s
AND trace.model = %%(target_model)s
if phone_fields:
# phone fields are checked on target mailed model
select_query = 'target.id, ' + ', '.join('target.%s' % fname for fname in phone_fields)
where_query = ' OR '.join('target.%s IS NOT NULL' % fname for fname in phone_fields)
join_add_query = ''
# phone fields are checked on res.partner model
partner_phone_fields = ['mobile', 'phone']
select_query = 'target.id, ' + ', '.join('partner.%s' % fname for fname in partner_phone_fields)
where_query = ' OR '.join('partner.%s IS NOT NULL' % fname for fname in partner_phone_fields)
join_add_query = 'JOIN res_partner partner ON (target.%s = partner.id)' % partner_field
query = query % {
'select_query': select_query,
'where_query': where_query,
'target_table': target._table,
'join_add_query': join_add_query,
params = {'mailing_id': self.id, 'target_model': self.mailing_model_real}
self._cr.execute(query, params)
query_res = self._cr.fetchall()
seen_list = set(number for item in query_res for number in item[1:] if number)
seen_ids = set(item[0] for item in query_res)
_logger.info("Mass SMS %s targets %s: already reached %s SMS", self, target._name, len(seen_list))
return list(seen_ids), list(seen_list)
def _send_sms_get_composer_values(self, res_ids):
return {
# content
'body': self.body_plaintext,
'template_id': self.sms_template_id.id,
'res_model': self.mailing_model_real,
'res_ids': repr(res_ids),
# options
'composition_mode': 'mass',
'mailing_id': self.id,
'mass_keep_log': self.keep_archives,
'mass_force_send': self.sms_force_send,
'mass_sms_allow_unsubscribe': self.sms_allow_unsubscribe,
def _action_send_mail(self, res_ids=None):
mass_sms = self.filtered(lambda m: m.mailing_type == 'sms')
if mass_sms:
return super(Mailing, self - mass_sms)._action_send_mail(res_ids=res_ids)
def action_send_sms(self, res_ids=None):
for mailing in self:
if not res_ids:
res_ids = mailing._get_remaining_recipients()
if res_ids:
composer = self.env['sms.composer'].with_context(active_id=False).create(mailing._send_sms_get_composer_values(res_ids))
return True
# ------------------------------------------------------
# ------------------------------------------------------
def _prepare_statistics_email_values(self):
"""Return some statistics that will be displayed in the mailing statistics email.
Each item in the returned list will be displayed as a table, with a title and
1, 2 or 3 columns.
values = super(Mailing, self)._prepare_statistics_email_values()
if self.mailing_type == 'sms':
mailing_type = self._get_pretty_mailing_type()
values['title'] = _('24H Stats of %(mailing_type)s "%(mailing_name)s"',
values['kpi_data'][0] = {
'kpi_fullname': _('Report for %(expected)i %(mailing_type)s Sent',
'kpi_col1': {
'value': f'{self.received_ratio}%',
'col_subtitle': _('RECEIVED (%i)', self.delivered),
'kpi_col2': {
'value': f'{self.clicks_ratio}%',
'col_subtitle': _('CLICKED (%i)', self.clicked),
'kpi_col3': {
'value': f'{self.bounced_ratio}%',
'col_subtitle': _('BOUNCED (%i)', self.bounced),
'kpi_action': None,
'kpi_name': self.mailing_type,
return values
def _get_pretty_mailing_type(self):
if self.mailing_type == 'sms':
return _('SMS Text Message')
return super(Mailing, self)._get_pretty_mailing_type()
# --------------------------------------------------
# --------------------------------------------------
def _get_default_mailing_domain(self):
mailing_domain = super(Mailing, self)._get_default_mailing_domain()
if self.mailing_type == 'sms' and 'phone_sanitized_blacklisted' in self.env[self.mailing_model_name]._fields:
mailing_domain = expression.AND([mailing_domain, [('phone_sanitized_blacklisted', '=', False)]])
return mailing_domain
def convert_links(self):
sms_mailings = self.filtered(lambda m: m.mailing_type == 'sms')
res = {}
for mailing in sms_mailings:
tracker_values = mailing._get_link_tracker_values()
body = mailing._shorten_links_text(mailing.body_plaintext, tracker_values)
res[mailing.id] = body
res.update(super(Mailing, self - sms_mailings).convert_links())
return res
def get_sms_link_replacements_placeholders(self):
"""Get placeholders for replaced links in sms widget for accurate computation of sms counts.
Reminders and assumptions:
* Links wille be transformed to the format "[base_url]/r/[link_tracker_code]/s/[sms_id]".
* unsubscribe is formatted as: "\nSTOP SMS : [base_url]/sms/[mailing_id]/[trace_code]".
:return: Character counts used for links, formatted as `{link: str, unsubscribe: str}`.
if self:
max_sms = self.env['sms.sms'].sudo().search_read([], ['id'], order='id desc', limit=1)
sms_id_length = max(len(str(max_sms[0]['id'])), 5) if max_sms else 5 # Assumes a mailing won't be more than 10⁵ sms at once
max_code = self.env['link.tracker.code'].sudo().search_read([], ['code'], order='id DESC', limit=1)
code_length = len(max_code[0]['code']) + 1 if max_code else LINK_TRACKER_MIN_CODE_LENGTH
if self.id:
mailing_id_placeholder_length = len(str(self.id))
max_mailing = self.env['mailing.mailing'].sudo().search_read([], ['id'], order='id DESC', limit=1)
mailing_id_placeholder_length = len(str(max_mailing[0]['id'] + 1)) if max_mailing else 1
mailing_id_placeholder = 'x' * mailing_id_placeholder_length
base_url = self.get_base_url()
opt_out_url = urljoin(base_url, f"sms/{mailing_id_placeholder}/{'x' * self.env['mailing.trace'].CODE_SIZE}")
return {
'link': urljoin(base_url, f"r/{'x' * code_length}/s/{'x' * sms_id_length}"),
'unsubscribe': f"\n{self.env['sms.composer']._get_unsubscribe_info(opt_out_url)}"
# ------------------------------------------------------
# A/B Test Override
# ------------------------------------------------------
def _get_ab_testing_description_modifying_fields(self):
fields_list = super()._get_ab_testing_description_modifying_fields()
return fields_list + ['ab_testing_sms_winner_selection']
def _get_ab_testing_description_values(self):
values = super()._get_ab_testing_description_values()
if self.mailing_type == 'sms':
'ab_testing_count': self.ab_testing_mailings_sms_count,
'ab_testing_winner_selection': self.ab_testing_sms_winner_selection,
return values
def _get_ab_testing_winner_selection(self):
result = super()._get_ab_testing_winner_selection()
if self.mailing_type == 'sms':
ab_testing_winner_selection_description = dict(
'value': self.campaign_id.ab_testing_sms_winner_selection,
'description': ab_testing_winner_selection_description
return result
def _get_ab_testing_siblings_mailings(self):
mailings = super()._get_ab_testing_siblings_mailings()
if self.mailing_type == 'sms':
mailings = self.campaign_id.mailing_sms_ids.filtered('ab_testing_enabled')
return mailings
def _get_default_ab_testing_campaign_values(self, values=None):
campaign_values = super()._get_default_ab_testing_campaign_values(values)
values = values or dict()
if self.mailing_type == 'sms':
sms_subject = values.get('sms_subject') or self.sms_subject
if sms_subject:
campaign_values['name'] = _("A/B Test: %s", sms_subject)
campaign_values['ab_testing_sms_winner_selection'] = self.ab_testing_sms_winner_selection
return campaign_values