# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. from datetime import datetime import logging import pytz from odoo import api, fields, models from odoo.osv import expression _logger = logging.getLogger(__name__) class MailActivityMixin(models.AbstractModel): """ Mail Activity Mixin is a mixin class to use if you want to add activities management on a model. It works like the mail.thread mixin. It defines an activity_ids one2many field toward activities using res_id and res_model_id. Various related / computed fields are also added to have a global status of activities on documents. Activities come with a new JS widget for the form view. It is integrated in the Chatter widget although it is a separate widget. It displays activities linked to the current record and allow to schedule, edit and mark done activities. Just include field activity_ids in the div.oe-chatter to use it. There is also a kanban widget defined. It defines a small widget to integrate in kanban vignettes. It allow to manage activities directly from the kanban view. Use widget="kanban_activity" on activitiy_ids field in kanban view to use it. Some context keys allow to control the mixin behavior. Use those in some specific cases like import * ``mail_activity_automation_skip``: skip activities automation; it means no automated activities will be generated, updated or unlinked, allowing to save computation and avoid generating unwanted activities; """ _name = 'mail.activity.mixin' _description = 'Activity Mixin' def _default_activity_type(self): """Define a default fallback activity type when requested xml id wasn't found. Can be overriden to specify the default activity type of a model. It is only called in in activity_schedule() for now. """ return self.env['mail.activity']._default_activity_type_for_model(self._name) activity_ids = fields.One2many( 'mail.activity', 'res_id', 'Activities', auto_join=True, groups="base.group_user",) activity_state = fields.Selection([ ('overdue', 'Overdue'), ('today', 'Today'), ('planned', 'Planned')], string='Activity State', compute='_compute_activity_state', search='_search_activity_state', groups="base.group_user", help='Status based on activities\nOverdue: Due date is already passed\n' 'Today: Activity date is today\nPlanned: Future activities.') activity_user_id = fields.Many2one( 'res.users', 'Responsible User', related='activity_ids.user_id', readonly=False, search='_search_activity_user_id', groups="base.group_user") activity_type_id = fields.Many2one( 'mail.activity.type', 'Next Activity Type', related='activity_ids.activity_type_id', readonly=False, search='_search_activity_type_id', groups="base.group_user") activity_type_icon = fields.Char('Activity Type Icon', related='activity_ids.icon') activity_date_deadline = fields.Date( 'Next Activity Deadline', compute='_compute_activity_date_deadline', search='_search_activity_date_deadline', compute_sudo=False, readonly=True, store=False, groups="base.group_user") my_activity_date_deadline = fields.Date( 'My Activity Deadline', compute='_compute_my_activity_date_deadline', search='_search_my_activity_date_deadline', compute_sudo=False, readonly=True, groups="base.group_user") activity_summary = fields.Char( 'Next Activity Summary', related='activity_ids.summary', readonly=False, search='_search_activity_summary', groups="base.group_user",) activity_exception_decoration = fields.Selection([ ('warning', 'Alert'), ('danger', 'Error')], compute='_compute_activity_exception_type', search='_search_activity_exception_decoration', help="Type of the exception activity on record.") activity_exception_icon = fields.Char('Icon', help="Icon to indicate an exception activity.", compute='_compute_activity_exception_type') @api.depends('activity_ids.activity_type_id.decoration_type', 'activity_ids.activity_type_id.icon') def _compute_activity_exception_type(self): # prefetch all activity types for all activities, this will avoid any query in loops self.mapped('activity_ids.activity_type_id.decoration_type') for record in self: activity_type_ids = record.activity_ids.mapped('activity_type_id') exception_activity_type_id = False for activity_type_id in activity_type_ids: if activity_type_id.decoration_type == 'danger': exception_activity_type_id = activity_type_id break if activity_type_id.decoration_type == 'warning': exception_activity_type_id = activity_type_id record.activity_exception_decoration = exception_activity_type_id and exception_activity_type_id.decoration_type record.activity_exception_icon = exception_activity_type_id and exception_activity_type_id.icon def _search_activity_exception_decoration(self, operator, operand): return [('activity_ids.activity_type_id.decoration_type', operator, operand)] @api.depends('activity_ids.state') def _compute_activity_state(self): for record in self: states = record.activity_ids.mapped('state') if 'overdue' in states: record.activity_state = 'overdue' elif 'today' in states: record.activity_state = 'today' elif 'planned' in states: record.activity_state = 'planned' else: record.activity_state = False def _search_activity_state(self, operator, value): all_states = {'overdue', 'today', 'planned', False} if operator == '=': search_states = {value} elif operator == '!=': search_states = all_states - {value} elif operator == 'in': search_states = set(value) elif operator == 'not in': search_states = all_states - set(value) reverse_search = False if False in search_states: # If we search "activity_state = False", they might be a lot of records # (million for some models), so instead of returning the list of IDs # [(id, 'in', ids)] we will reverse the domain and return something like # [(id, 'not in', ids)], so the list of ids is as small as possible reverse_search = True search_states = all_states - search_states # Use number in the SQL query for performance purpose integer_state_value = { 'overdue': -1, 'today': 0, 'planned': 1, False: None, } search_states_int = {integer_state_value.get(s or False) for s in search_states} query = """ SELECT res_id FROM ( SELECT res_id, -- Global activity state MIN( -- Compute the state of each individual activities -- -1: overdue -- 0: today -- 1: planned SIGN(EXTRACT(day from ( mail_activity.date_deadline - DATE_TRUNC('day', %(today_utc)s AT TIME ZONE res_partner.tz) ))) )::INT AS activity_state FROM mail_activity LEFT JOIN res_users ON res_users.id = mail_activity.user_id LEFT JOIN res_partner ON res_partner.id = res_users.partner_id WHERE mail_activity.res_model = %(res_model_table)s GROUP BY res_id ) AS res_record WHERE %(search_states_int)s @> ARRAY[activity_state] """ self._cr.execute( query, { 'today_utc': pytz.utc.localize(datetime.utcnow()), 'res_model_table': self._name, 'search_states_int': list(search_states_int) }, ) return [('id', 'not in' if reverse_search else 'in', [r[0] for r in self._cr.fetchall()])] @api.depends('activity_ids.date_deadline') def _compute_activity_date_deadline(self): for record in self: record.activity_date_deadline = fields.first(record.activity_ids).date_deadline def _search_activity_date_deadline(self, operator, operand): if operator == '=' and not operand: return [('activity_ids', '=', False)] return [('activity_ids.date_deadline', operator, operand)] @api.model def _search_activity_user_id(self, operator, operand): return [('activity_ids.user_id', operator, operand)] @api.model def _search_activity_type_id(self, operator, operand): return [('activity_ids.activity_type_id', operator, operand)] @api.model def _search_activity_summary(self, operator, operand): return [('activity_ids.summary', operator, operand)] @api.depends('activity_ids.date_deadline', 'activity_ids.user_id') @api.depends_context('uid') def _compute_my_activity_date_deadline(self): for record in self: record.my_activity_date_deadline = next(( activity.date_deadline for activity in record.activity_ids if activity.user_id.id == record.env.uid ), False) def _search_my_activity_date_deadline(self, operator, operand): activity_ids = self.env['mail.activity']._search([ ('date_deadline', operator, operand), ('res_model', '=', self._name), ('user_id', '=', self.env.user.id) ]) return [('activity_ids', 'in', activity_ids)] def write(self, vals): # Delete activities of archived record. if 'active' in vals and vals['active'] is False: self.env['mail.activity'].sudo().search( [('res_model', '=', self._name), ('res_id', 'in', self.ids)] ).unlink() return super(MailActivityMixin, self).write(vals) def unlink(self): """ Override unlink to delete records activities through (res_model, res_id). """ record_ids = self.ids result = super(MailActivityMixin, self).unlink() self.env['mail.activity'].sudo().search( [('res_model', '=', self._name), ('res_id', 'in', record_ids)] ).unlink() return result def _read_progress_bar(self, domain, group_by, progress_bar): group_by_fname = group_by.partition(':')[0] if not (progress_bar['field'] == 'activity_state' and self._fields[group_by_fname].store): return super()._read_progress_bar(domain, group_by, progress_bar) # optimization for 'activity_state' # explicitly check access rights, since we bypass the ORM self.check_access_rights('read') self._flush_search(domain, fields=[group_by_fname], order='id') self.env['mail.activity'].flush_model(['res_model', 'res_id', 'user_id', 'date_deadline']) self.env['res.users'].flush_model(['partner_id']) self.env['res.partner'].flush_model(['tz']) query = self._where_calc(domain) self._apply_ir_rules(query, 'read') gb = group_by.partition(':')[0] annotated_groupbys = [ self._read_group_process_groupby(gb, query) for gb in [group_by, 'activity_state'] ] groupby_dict = {gb['groupby']: gb for gb in annotated_groupbys} for gb in annotated_groupbys: if gb['field'] == 'activity_state': gb['qualified_field'] = '"_last_activity_state"."activity_state"' groupby_terms, _orderby_terms = self._read_group_prepare('activity_state', [], annotated_groupbys, query) select_terms = [ '%s as "%s"' % (gb['qualified_field'], gb['groupby']) for gb in annotated_groupbys ] from_clause, where_clause, where_params = query.get_sql() tz = self._context.get('tz') or self.env.user.tz or 'UTC' select_query = """ SELECT 1 AS id, count(*) AS "__count", {fields} FROM {from_clause} JOIN ( SELECT res_id, CASE WHEN min(date_deadline - (now() AT TIME ZONE COALESCE(res_partner.tz, %s))::date) > 0 THEN 'planned' WHEN min(date_deadline - (now() AT TIME ZONE COALESCE(res_partner.tz, %s))::date) < 0 THEN 'overdue' WHEN min(date_deadline - (now() AT TIME ZONE COALESCE(res_partner.tz, %s))::date) = 0 THEN 'today' ELSE null END AS activity_state FROM mail_activity JOIN res_users ON (res_users.id = mail_activity.user_id) JOIN res_partner ON (res_partner.id = res_users.partner_id) WHERE res_model = '{model}' GROUP BY res_id ) AS "_last_activity_state" ON ("{table}".id = "_last_activity_state".res_id) WHERE {where_clause} GROUP BY {group_by} """.format( fields=', '.join(select_terms), from_clause=from_clause, model=self._name, table=self._table, where_clause=where_clause or '1=1', group_by=', '.join(groupby_terms), ) num_from_params = from_clause.count('%s') where_params[num_from_params:num_from_params] = [tz] * 3 # timezone after from parameters self.env.cr.execute(select_query, where_params) fetched_data = self.env.cr.dictfetchall() self._read_group_resolve_many2x_fields(fetched_data, annotated_groupbys) data = [ {key: self._read_group_prepare_data(key, val, groupby_dict) for key, val in row.items()} for row in fetched_data ] return [ self._read_group_format_result(vals, annotated_groupbys, [group_by], domain) for vals in data ] def toggle_active(self): """ Before archiving the record we should also remove its ongoing activities. Otherwise they stay in the systray and concerning archived records it makes no sense. """ record_to_deactivate = self.filtered(lambda rec: rec[rec._active_name]) if record_to_deactivate: # use a sudo to bypass every access rights; all activities should be removed self.env['mail.activity'].sudo().search([ ('res_model', '=', self._name), ('res_id', 'in', record_to_deactivate.ids) ]).unlink() return super(MailActivityMixin, self).toggle_active() def activity_send_mail(self, template_id): """ Automatically send an email based on the given mail.template, given its ID. """ template = self.env['mail.template'].browse(template_id).exists() if not template: return False for record in self: record.message_post_with_template( template_id, composition_mode='comment' ) return True def activity_search(self, act_type_xmlids='', user_id=None, additional_domain=None): """ Search automated activities on current record set, given a list of activity types xml IDs. It is useful when dealing with specific types involved in automatic activities management. :param act_type_xmlids: list of activity types xml IDs :param user_id: if set, restrict to activities of that user_id; :param additional_domain: if set, filter on that domain; """ if self.env.context.get('mail_activity_automation_skip'): return self.env['mail.activity'] Data = self.env['ir.model.data'].sudo() activity_types_ids = [type_id for type_id in (Data._xmlid_to_res_id(xmlid, raise_if_not_found=False) for xmlid in act_type_xmlids) if type_id] if not any(activity_types_ids): return self.env['mail.activity'] domain = [ '&', '&', '&', ('res_model', '=', self._name), ('res_id', 'in', self.ids), ('automated', '=', True), ('activity_type_id', 'in', activity_types_ids) ] if user_id: domain = expression.AND([domain, [('user_id', '=', user_id)]]) if additional_domain: domain = expression.AND([domain, additional_domain]) return self.env['mail.activity'].search(domain) def activity_schedule(self, act_type_xmlid='', date_deadline=None, summary='', note='', **act_values): """ Schedule an activity on each record of the current record set. This method allow to provide as parameter act_type_xmlid. This is an xml_id of activity type instead of directly giving an activity_type_id. It is useful to avoid having various "env.ref" in the code and allow to let the mixin handle access rights. :param date_deadline: the day the activity must be scheduled on the timezone of the user must be considered to set the correct deadline """ if self.env.context.get('mail_activity_automation_skip'): return False if not date_deadline: date_deadline = fields.Date.context_today(self) if isinstance(date_deadline, datetime): _logger.warning("Scheduled deadline should be a date (got %s)", date_deadline) if act_type_xmlid: activity_type_id = self.env['ir.model.data']._xmlid_to_res_id(act_type_xmlid, raise_if_not_found=False) if activity_type_id: activity_type = self.env['mail.activity.type'].browse(activity_type_id) else: activity_type = self._default_activity_type() else: activity_type_id = act_values.get('activity_type_id', False) activity_type = self.env['mail.activity.type'].browse(activity_type_id) if activity_type_id else self.env['mail.activity.type'] model_id = self.env['ir.model']._get(self._name).id create_vals_list = [] for record in self: create_vals = { 'activity_type_id': activity_type.id, 'summary': summary or activity_type.summary, 'automated': True, 'note': note or activity_type.default_note, 'date_deadline': date_deadline, 'res_model_id': model_id, 'res_id': record.id, } create_vals.update(act_values) if not create_vals.get('user_id'): create_vals['user_id'] = activity_type.default_user_id.id or self.env.uid create_vals_list.append(create_vals) return self.env['mail.activity'].create(create_vals_list) def _activity_schedule_with_view(self, act_type_xmlid='', date_deadline=None, summary='', views_or_xmlid='', render_context=None, **act_values): """ Helper method: Schedule an activity on each record of the current record set. This method allow to the same mecanism as `activity_schedule`, but provide 2 additionnal parameters: :param views_or_xmlid: record of ir.ui.view or string representing the xmlid of the qweb template to render :type views_or_xmlid: string or recordset :param render_context: the values required to render the given qweb template :type render_context: dict """ if self.env.context.get('mail_activity_automation_skip'): return False view_ref = views_or_xmlid.id if isinstance(views_or_xmlid, models.BaseModel) else views_or_xmlid render_context = render_context or dict() activities = self.env['mail.activity'] for record in self: render_context['object'] = record note = self.env['ir.qweb']._render(view_ref, render_context, minimal_qcontext=True, raise_if_not_found=False) activities += record.activity_schedule(act_type_xmlid=act_type_xmlid, date_deadline=date_deadline, summary=summary, note=note, **act_values) return activities def activity_reschedule(self, act_type_xmlids, user_id=None, date_deadline=None, new_user_id=None): """ Reschedule some automated activities. Activities to reschedule are selected based on type xml ids and optionally by user. Purpose is to be able to * update the deadline to date_deadline; * update the responsible to new_user_id; """ if self.env.context.get('mail_activity_automation_skip'): return False Data = self.env['ir.model.data'].sudo() activity_types_ids = [Data._xmlid_to_res_id(xmlid, raise_if_not_found=False) for xmlid in act_type_xmlids] activity_types_ids = [act_type_id for act_type_id in activity_types_ids if act_type_id] if not any(activity_types_ids): return False activities = self.activity_search(act_type_xmlids, user_id=user_id) if activities: write_vals = {} if date_deadline: write_vals['date_deadline'] = date_deadline if new_user_id: write_vals['user_id'] = new_user_id activities.write(write_vals) return activities def activity_feedback(self, act_type_xmlids, user_id=None, feedback=None, attachment_ids=None): """ Set activities as done, limiting to some activity types and optionally to a given user. """ if self.env.context.get('mail_activity_automation_skip'): return False Data = self.env['ir.model.data'].sudo() activity_types_ids = [Data._xmlid_to_res_id(xmlid, raise_if_not_found=False) for xmlid in act_type_xmlids] activity_types_ids = [act_type_id for act_type_id in activity_types_ids if act_type_id] if not any(activity_types_ids): return False activities = self.activity_search(act_type_xmlids, user_id=user_id) if activities: activities.action_feedback(feedback=feedback, attachment_ids=attachment_ids) return True def activity_unlink(self, act_type_xmlids, user_id=None): """ Unlink activities, limiting to some activity types and optionally to a given user. """ if self.env.context.get('mail_activity_automation_skip'): return False Data = self.env['ir.model.data'].sudo() activity_types_ids = [Data._xmlid_to_res_id(xmlid, raise_if_not_found=False) for xmlid in act_type_xmlids] activity_types_ids = [act_type_id for act_type_id in activity_types_ids if act_type_id] if not any(activity_types_ids): return False self.activity_search(act_type_xmlids, user_id=user_id).unlink() return True