# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import ast import re from markupsafe import Markup from odoo import _, api, fields, models from odoo.exceptions import ValidationError, UserError from odoo.tools import is_html_empty, remove_accents # see rfc5322 section 3.2.3 atext = r"[a-zA-Z0-9!#$%&'*+\-/=?^_`{|}~]" dot_atom_text = re.compile(r"^%s+(\.%s+)*$" % (atext, atext)) class Alias(models.Model): """A Mail Alias is a mapping of an email address with a given Odoo Document model. It is used by Odoo's mail gateway when processing incoming emails sent to the system. If the recipient address (To) of the message matches a Mail Alias, the message will be either processed following the rules of that alias. If the message is a reply it will be attached to the existing discussion on the corresponding record, otherwise a new record of the corresponding model will be created. This is meant to be used in combination with a catch-all email configuration on the company's mail server, so that as soon as a new mail.alias is created, it becomes immediately usable and Odoo will accept email for it. """ _name = 'mail.alias' _description = "Email Aliases" _rec_name = 'alias_name' _order = 'alias_model_id, alias_name' alias_name = fields.Char('Alias Name', copy=False, help="The name of the email alias, e.g. 'jobs' if you want to catch emails for ") alias_model_id = fields.Many2one('ir.model', 'Aliased Model', required=True, ondelete="cascade", help="The model (Odoo Document Kind) to which this alias " "corresponds. Any incoming email that does not reply to an " "existing record will cause the creation of a new record " "of this model (e.g. a Project Task)", # hack to only allow selecting mail_thread models (we might # (have a few false positives, though) domain="[('field_id.name', '=', 'message_ids')]") alias_user_id = fields.Many2one('res.users', 'Owner', default=lambda self: self.env.user, help="The owner of records created upon receiving emails on this alias. " "If this field is not set the system will attempt to find the right owner " "based on the sender (From) address, or will use the Administrator account " "if no system user is found for that address.") alias_defaults = fields.Text('Default Values', required=True, default='{}', help="A Python dictionary that will be evaluated to provide " "default values when creating new records for this alias.") alias_force_thread_id = fields.Integer( 'Record Thread ID', help="Optional ID of a thread (record) to which all incoming messages will be attached, even " "if they did not reply to it. If set, this will disable the creation of new records completely.") alias_domain = fields.Char('Alias domain', compute='_compute_alias_domain') alias_parent_model_id = fields.Many2one( 'ir.model', 'Parent Model', help="Parent model holding the alias. The model holding the alias reference " "is not necessarily the model given by alias_model_id " "(example: project (parent_model) and task (model))") alias_parent_thread_id = fields.Integer('Parent Record Thread ID', help="ID of the parent record holding the alias (example: project holding the task creation alias)") alias_contact = fields.Selection([ ('everyone', 'Everyone'), ('partners', 'Authenticated Partners'), ('followers', 'Followers only')], default='everyone', string='Alias Contact Security', required=True, help="Policy to post a message on the document using the mailgateway.\n" "- everyone: everyone can post\n" "- partners: only authenticated partners\n" "- followers: only followers of the related document or members of following channels\n") alias_bounced_content = fields.Html( "Custom Bounced Message", translate=True, help="If set, this content will automatically be sent out to unauthorized users instead of the default message.") _sql_constraints = [ ('alias_unique', 'UNIQUE(alias_name)', 'Unfortunately this email alias is already used, please choose a unique one') ] @api.constrains('alias_name') def _alias_is_ascii(self): """ The local-part ("display-name" ) of an address only contains limited range of ascii characters. We DO NOT allow anything else than ASCII dot-atom formed local-part. Quoted-string and internationnal characters are to be rejected. See rfc5322 sections 3.4.1 and 3.2.3 """ for alias in self: if alias.alias_name and not dot_atom_text.match(alias.alias_name): raise ValidationError(_( "You cannot use anything else than unaccented latin characters in the alias address (%s).", alias.alias_name, )) @api.depends('alias_name') def _compute_alias_domain(self): self.alias_domain = self.env["ir.config_parameter"].sudo().get_param("mail.catchall.domain") @api.constrains('alias_defaults') def _check_alias_defaults(self): for alias in self: try: dict(ast.literal_eval(alias.alias_defaults)) except Exception: raise ValidationError(_('Invalid expression, it must be a literal python dictionary definition e.g. "{\'field\': \'value\'}"')) @api.model_create_multi def create(self, vals_list): """ Creates email.alias records according to the values provided in ``vals`` with 1 alteration: * ``alias_name`` value may be cleaned by replacing certain unsafe characters; :raise UserError: if given alias_name is already assigned or there are duplicates in given vals_list; """ alias_names = [vals['alias_name'] for vals in vals_list if vals.get('alias_name')] if alias_names: sanitized_names = self._clean_and_check_unique(alias_names) for vals in vals_list: if vals.get('alias_name'): vals['alias_name'] = sanitized_names[alias_names.index(vals['alias_name'])] return super(Alias, self).create(vals_list) def write(self, vals): """"Raises UserError if given alias name is already assigned""" if vals.get('alias_name') and self.ids: if len(self) > 1: raise UserError(_( 'Email alias %(alias_name)s cannot be used on %(count)d records at the same time. Please update records one by one.', alias_name=vals['alias_name'], count=len(self) )) vals['alias_name'] = self._clean_and_check_unique([vals.get('alias_name')])[0] return super(Alias, self).write(vals) def name_get(self): """Return the mail alias display alias_name, including the implicit mail catchall domain if exists from config otherwise "New Alias". e.g. `jobs@mail.odoo.com` or `jobs` or 'New Alias' """ res = [] for record in self: if record.alias_name and record.alias_domain: res.append((record['id'], "%s@%s" % (record.alias_name, record.alias_domain))) elif record.alias_name: res.append((record['id'], "%s" % (record.alias_name))) else: res.append((record['id'], _("Inactive Alias"))) return res def _clean_and_check_mail_catchall_allowed_domains(self, value): """ The purpose of this system parameter is to avoid the creation of records from incoming emails with a domain != alias_domain but that have a pattern matching an internal mail.alias . """ value = [domain.strip().lower() for domain in value.split(',') if domain.strip()] if not value: raise ValidationError(_("Value for `mail.catchall.domain.allowed` cannot be validated.\n" "It should be a comma separated list of domains e.g. example.com,example.org.")) return ",".join(value) def _clean_and_check_unique(self, names): """When an alias name appears to already be an email, we keep the local part only. A sanitizing / cleaning is also performed on the name. If name already exists an UserError is raised. """ def _sanitize_alias_name(name): """ Cleans and sanitizes the alias name """ sanitized_name = remove_accents(name).lower().split('@')[0] sanitized_name = re.sub(r'[^\w+.]+', '-', sanitized_name) sanitized_name = re.sub(r'^\.+|\.+$|\.+(?=\.)', '', sanitized_name) sanitized_name = sanitized_name.encode('ascii', errors='replace').decode() return sanitized_name sanitized_names = [_sanitize_alias_name(name) for name in names] catchall_alias = self.env['ir.config_parameter'].sudo().get_param('mail.catchall.alias') bounce_alias = self.env['ir.config_parameter'].sudo().get_param('mail.bounce.alias') alias_domain = self.env["ir.config_parameter"].sudo().get_param("mail.catchall.domain") # matches catchall or bounce alias for sanitized_name in sanitized_names: if sanitized_name in [catchall_alias, bounce_alias]: matching_alias_name = '%s@%s' % (sanitized_name, alias_domain) if alias_domain else sanitized_name raise UserError( _('The e-mail alias %(matching_alias_name)s is already used as %(alias_duplicate)s alias. Please choose another alias.', matching_alias_name=matching_alias_name, alias_duplicate=_('catchall') if sanitized_name == catchall_alias else _('bounce')) ) # matches existing alias domain = [('alias_name', 'in', sanitized_names)] if self: domain += [('id', 'not in', self.ids)] matching_alias = self.search(domain, limit=1) if not matching_alias: return sanitized_names sanitized_alias_name = _sanitize_alias_name(matching_alias.alias_name) matching_alias_name = '%s@%s' % (sanitized_alias_name, alias_domain) if alias_domain else sanitized_alias_name if matching_alias.alias_parent_model_id and matching_alias.alias_parent_thread_id: # If parent model and parent thread ID both are set, display document name also in the warning document_name = self.env[matching_alias.alias_parent_model_id.model].sudo().browse(matching_alias.alias_parent_thread_id).display_name raise UserError( _('The e-mail alias %(matching_alias_name)s is already used by the %(document_name)s %(model_name)s. Choose another alias or change it on the other document.', matching_alias_name=matching_alias_name, document_name=document_name, model_name=matching_alias.alias_parent_model_id.name) ) raise UserError( _('The e-mail alias %(matching_alias_name)s is already linked with %(alias_model_name)s. Choose another alias or change it on the linked model.', matching_alias_name=matching_alias_name, alias_model_name=matching_alias.alias_model_id.name) ) def open_document(self): if not self.alias_model_id or not self.alias_force_thread_id: return False return { 'view_mode': 'form', 'res_model': self.alias_model_id.model, 'res_id': self.alias_force_thread_id, 'type': 'ir.actions.act_window', } def open_parent_document(self): if not self.alias_parent_model_id or not self.alias_parent_thread_id: return False return { 'view_mode': 'form', 'res_model': self.alias_parent_model_id.model, 'res_id': self.alias_parent_thread_id, 'type': 'ir.actions.act_window', } def _get_alias_bounced_body_fallback(self, message_dict): contact_description = self._get_alias_contact_description() default_email = self.env.company.partner_id.email_formatted if self.env.company.partner_id.email else self.env.company.name return Markup( _("""

Dear Sender,

The message below could not be accepted by the address %(alias_display_name)s. Only %(contact_description)s are allowed to contact it.

Please make sure you are using the correct address or contact us at %(default_email)s instead.

Kind Regards,

""" )) % { 'alias_display_name': self.display_name, 'contact_description': contact_description, 'default_email': default_email, } def _get_alias_contact_description(self): if self.alias_contact == 'partners': return _('addresses linked to registered partners') return _('some specific addresses') def _get_alias_bounced_body(self, message_dict): """Get the body of the email return in case of bounced email. :param message_dict: dictionary of mail values """ lang_author = False if message_dict.get('author_id'): try: lang_author = self.env['res.partner'].browse(message_dict['author_id']).lang except: pass if lang_author: self = self.with_context(lang=lang_author) if not is_html_empty(self.alias_bounced_content): body = self.alias_bounced_content else: body = self._get_alias_bounced_body_fallback(message_dict) return self.env['ir.qweb']._render('mail.mail_bounce_alias_security', { 'body': body, 'message': message_dict }, minimal_qcontext=True)