2025-01-06 10:57:38 +07:00
# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import re
from pytz import UTC
from collections import defaultdict
from datetime import timedelta , datetime , time
from odoo import api , Command , fields , models , tools , SUPERUSER_ID , _
from odoo . addons . rating . models import rating_data
from odoo . addons . web_editor . tools import handle_history_divergence
from odoo . exceptions import UserError , ValidationError , AccessError
from odoo . osv import expression
from odoo . tools import format_list , SQL
from odoo . addons . resource . models . utils import filter_domain_leaf
from odoo . addons . project . controllers . project_sharing_chatter import ProjectSharingChatter
from odoo . addons . mail . tools . discuss import Store
' id ' ,
' active ' ,
' priority ' ,
' project_id ' ,
' display_in_project ' ,
' color ' ,
' allow_task_dependencies ' ,
' subtask_count ' ,
' email_from ' ,
' create_date ' ,
' write_date ' ,
' company_id ' ,
' displayed_image_id ' ,
' display_name ' ,
' portal_user_names ' ,
' user_ids ' ,
' display_parent_task_button ' ,
' current_user_same_company_partner ' ,
' allow_milestones ' ,
' milestone_id ' ,
' has_late_and_unreached_milestone ' ,
' date_assign ' ,
' dependent_ids ' ,
' message_is_follower ' ,
' recurring_task ' ,
' closed_subtask_count ' ,
' dependent_tasks_count ' ,
' depend_on_ids ' ,
' depend_on_count ' ,
' repeat_interval ' ,
' repeat_unit ' ,
' repeat_type ' ,
' repeat_until ' ,
' recurrence_id ' ,
' recurring_count ' ,
' duration_tracking ' ,
' display_follow_button ' ,
' name ' ,
' description ' ,
' partner_id ' ,
' date_deadline ' ,
' date_last_stage_update ' ,
' tag_ids ' ,
' sequence ' ,
' stage_id ' ,
' child_ids ' ,
' parent_id ' ,
' priority ' ,
' state ' ,
' is_closed ' ,
' 1_done ' : ' Done ' ,
' 1_canceled ' : ' Cancelled ' ,
class Task ( models . Model ) :
_name = " project.task "
_description = " Task "
_date_name = " date_assign "
_inherit = [
' portal.mixin ' ,
' mail.thread.cc ' ,
' mail.activity.mixin ' ,
' rating.mixin ' ,
' mail.tracking.duration.mixin ' ,
' html.field.history.mixin ' ,
_mail_post_access = ' read '
_order = " priority desc, sequence, date_deadline asc, id desc "
_primary_email = ' email_from '
_systray_view = ' activity '
_track_duration_field = ' stage_id '
def _get_versioned_fields ( self ) :
return [ Task . description . name ]
def _get_default_partner_id ( self , project = None , parent = None ) :
if parent and parent . partner_id :
return parent . partner_id . id
if project and project . partner_id :
return project . partner_id . id
return False
def _get_default_stage_id ( self ) :
""" Gives default stage_id """
project_id = self . env . context . get ( ' default_project_id ' )
if not project_id :
return False
return self . stage_find ( project_id , order = " fold, sequence, id " )
def _default_personal_stage_type_id ( self ) :
default_id = self . env . context . get ( ' default_personal_stage_type_ids ' )
return ( default_id or self . env [ ' project.task.type ' ] . search ( [ ( ' user_id ' , ' = ' , self . env . user . id ) ] , limit = 1 ) . ids or [ False ] ) [ 0 ]
def _default_user_ids ( self ) :
return self . env . context . keys ( ) & { ' default_personal_stage_type_ids ' , ' default_personal_stage_type_id ' } and self . env . user
def _default_company_id ( self ) :
if self . _context . get ( ' default_project_id ' ) :
return self . env [ ' project.project ' ] . browse ( self . _context [ ' default_project_id ' ] ) . company_id
return False
def _read_group_stage_ids ( self , stages , domain ) :
search_domain = [ ( ' id ' , ' in ' , stages . ids ) ]
2025-03-04 12:23:19 +07:00
if ' default_project_id ' in self . env . context and not self . _context . get ( ' subtask_action ' ) and ' project_kanban ' in self . env . context :
2025-01-06 10:57:38 +07:00
search_domain = [ ' | ' , ( ' project_ids ' , ' = ' , self . env . context [ ' default_project_id ' ] ) ] + search_domain
stage_ids = stages . sudo ( ) . _search ( search_domain , order = stages . _order )
return stages . browse ( stage_ids )
def _read_group_personal_stage_type_ids ( self , stages , domain ) :
return stages . search ( [ ' | ' , ( ' id ' , ' in ' , stages . ids ) , ( ' user_id ' , ' = ' , self . env . user . id ) ] )
active = fields . Boolean ( default = True , export_string_translation = False )
name = fields . Char ( string = ' Title ' , tracking = True , required = True , index = ' trigram ' )
description = fields . Html ( string = ' Description ' , sanitize_attributes = False )
priority = fields . Selection ( [
( ' 0 ' , ' Low ' ) ,
( ' 1 ' , ' High ' ) ,
] , default = ' 0 ' , index = True , string = " Priority " , tracking = True )
sequence = fields . Integer ( string = ' Sequence ' , default = 10 , export_string_translation = False )
stage_id = fields . Many2one ( ' project.task.type ' , string = ' Stage ' , compute = ' _compute_stage_id ' ,
store = True , readonly = False , ondelete = ' restrict ' , tracking = True , index = True ,
default = _get_default_stage_id , group_expand = ' _read_group_stage_ids ' ,
domain = " [( ' project_ids ' , ' = ' , project_id)] " )
tag_ids = fields . Many2many ( ' project.tags ' , string = ' Tags ' )
state = fields . Selection ( [
( ' 01_in_progress ' , ' In Progress ' ) ,
( ' 02_changes_requested ' , ' Changes Requested ' ) ,
( ' 03_approved ' , ' Approved ' ) ,
* CLOSED_STATES . items ( ) ,
( ' 04_waiting_normal ' , ' Waiting ' ) ,
] , string = ' State ' , copy = False , default = ' 01_in_progress ' , required = True , compute = ' _compute_state ' , inverse = ' _inverse_state ' , readonly = False , store = True , index = True , recursive = True , tracking = True )
is_closed = fields . Boolean ( " Closed state " , compute = ' _compute_is_closed ' , search = ' _search_is_closed ' )
create_date = fields . Datetime ( " Created On " , readonly = True , index = True )
write_date = fields . Datetime ( " Last Updated On " , readonly = True )
date_end = fields . Datetime ( string = ' Ending Date ' , index = True , copy = False )
date_assign = fields . Datetime ( string = ' Assigning Date ' , copy = False , readonly = True ,
help = " Date on which this task was last assigned (or unassigned). Based on this, you can get statistics on the time it usually takes to assign tasks. " )
date_deadline = fields . Datetime ( string = ' Deadline ' , index = True , tracking = True )
date_last_stage_update = fields . Datetime ( string = ' Last Stage Update ' ,
index = True ,
copy = False ,
readonly = True ,
help = " Date on which the state of your task has last been modified. \n "
" Based on this information you can identify tasks that are stalling and get statistics on the time it usually takes to move tasks from one stage/state to another. " )
project_id = fields . Many2one ( ' project.project ' , string = ' Project ' , domain = " [ ' | ' , ( ' company_id ' , ' = ' , False), ( ' company_id ' , ' =? ' , company_id)] " ,
compute = " _compute_project_id " , store = True , precompute = True , recursive = True , readonly = False , index = True , tracking = True , change_default = True )
display_in_project = fields . Boolean ( compute = ' _compute_display_in_project ' , store = True , readonly = False , export_string_translation = False )
# Technical field to display the 'Display in Project' button in the form view, depending on the project
show_display_in_project = fields . Boolean ( compute = ' _compute_show_display_in_project ' )
task_properties = fields . Properties ( ' Properties ' , definition = ' project_id.task_properties_definition ' , copy = True )
allocated_hours = fields . Float ( " Allocated Time " , tracking = True )
subtask_allocated_hours = fields . Float ( " Sub-tasks Allocated Time " , compute = ' _compute_subtask_allocated_hours ' , export_string_translation = False ,
help = " Sum of the hours allocated for all the sub-tasks (and their own sub-tasks) linked to this task. Usually less than or equal to the allocated hours of this task. " )
# Tracking of this field is done in the write function
user_ids = fields . Many2many ( ' res.users ' , relation = ' project_task_user_rel ' , column1 = ' task_id ' , column2 = ' user_id ' ,
string = ' Assignees ' , context = { ' active_test ' : False } , tracking = True , default = _default_user_ids , domain = " [( ' share ' , ' = ' , False), ( ' active ' , ' = ' , True)] " )
# User names displayed in project sharing views
portal_user_names = fields . Char ( compute = ' _compute_portal_user_names ' , compute_sudo = True , search = ' _search_portal_user_names ' , export_string_translation = False )
# Second Many2many containing the actual personal stage for the current user
# See project_task_stage_personal.py for the model defininition
personal_stage_type_ids = fields . Many2many ( ' project.task.type ' , ' project_task_user_rel ' , column1 = ' task_id ' , column2 = ' stage_id ' ,
ondelete = ' restrict ' , group_expand = ' _read_group_personal_stage_type_ids ' , copy = False ,
domain = " [( ' user_id ' , ' = ' , uid)] " , string = ' Personal Stages ' , export_string_translation = False )
# Personal Stage computed from the user
personal_stage_id = fields . Many2one ( ' project.task.stage.personal ' , string = ' Personal Stage State ' , compute_sudo = False ,
compute = ' _compute_personal_stage_id ' , help = " The current user ' s personal stage. " )
# This field is actually a related field on personal_stage_id.stage_id
# However due to the fact that personal_stage_id is computed, the orm throws out errors
# saying the field cannot be searched.
personal_stage_type_id = fields . Many2one ( ' project.task.type ' , string = ' Personal Stage ' ,
compute = ' _compute_personal_stage_type_id ' , inverse = ' _inverse_personal_stage_type_id ' , store = False ,
search = ' _search_personal_stage_type_id ' , default = _default_personal_stage_type_id ,
help = " The current user ' s personal task stage. " , domain = " [( ' user_id ' , ' = ' , uid)] " )
partner_id = fields . Many2one ( ' res.partner ' ,
string = ' Customer ' , recursive = True , tracking = True , compute = ' _compute_partner_id ' , store = True , readonly = False ,
domain = " [ ' | ' , ( ' company_id ' , ' =? ' , company_id), ( ' company_id ' , ' = ' , False)] " , )
email_cc = fields . Char ( help = ' Email addresses that were in the CC of the incoming emails from this task and that are not currently linked to an existing customer. ' )
company_id = fields . Many2one ( ' res.company ' , string = ' Company ' , compute = ' _compute_company_id ' , store = True , readonly = False , recursive = True , copy = True , default = _default_company_id )
color = fields . Integer ( string = ' Color Index ' , export_string_translation = False )
rating_active = fields . Boolean ( string = ' Project Rating Status ' , related = " project_id.rating_active " )
attachment_ids = fields . One2many (
' ir.attachment ' ,
compute = ' _compute_attachment_ids ' ,
string = " Attachments " ,
export_string_translation = False ,
help = " Attachments that don ' t come from a message " ,
# In the domain of displayed_image_id, we couln't use attachment_ids because a one2many is represented as a list of commands so we used res_model & res_id
displayed_image_id = fields . Many2one ( ' ir.attachment ' , domain = " [( ' res_model ' , ' = ' , ' project.task ' ), ( ' res_id ' , ' = ' , id), ( ' mimetype ' , ' ilike ' , ' image ' )] " , string = ' Cover Image ' )
parent_id = fields . Many2one ( ' project.task ' , string = ' Parent Task ' , index = True , domain = " [ ' ! ' , ( ' id ' , ' child_of ' , id)] " , tracking = True )
child_ids = fields . One2many ( ' project.task ' , ' parent_id ' , string = " Sub-tasks " , domain = " [( ' recurring_task ' , ' = ' , False)] " , export_string_translation = False )
subtask_count = fields . Integer ( " Sub-task Count " , compute = ' _compute_subtask_count ' , export_string_translation = False )
closed_subtask_count = fields . Integer ( " Closed Sub-tasks Count " , compute = ' _compute_subtask_count ' , export_string_translation = False )
project_privacy_visibility = fields . Selection ( related = ' project_id.privacy_visibility ' , string = " Project Visibility " , tracking = False )
subtask_completion_percentage = fields . Float ( compute = " _compute_subtask_completion_percentage " , export_string_translation = False )
# Computed field about working time elapsed between record creation and assignation/closing.
working_hours_open = fields . Float ( compute = ' _compute_elapsed ' , string = ' Working Hours to Assign ' , digits = ( 16 , 2 ) , store = True , aggregator = " avg " )
working_hours_close = fields . Float ( compute = ' _compute_elapsed ' , string = ' Working Hours to Close ' , digits = ( 16 , 2 ) , store = True , aggregator = " avg " )
working_days_open = fields . Float ( compute = ' _compute_elapsed ' , string = ' Working Days to Assign ' , store = True , aggregator = " avg " )
working_days_close = fields . Float ( compute = ' _compute_elapsed ' , string = ' Working Days to Close ' , store = True , aggregator = " avg " )
# customer portal: include comment and (incoming/outgoing) emails in communication history
website_message_ids = fields . One2many ( domain = lambda self : [ ( ' model ' , ' = ' , self . _name ) , ( ' message_type ' , ' in ' , [ ' email ' , ' comment ' , ' email_outgoing ' ] ) ] , export_string_translation = False )
allow_milestones = fields . Boolean ( related = ' project_id.allow_milestones ' , export_string_translation = False )
milestone_id = fields . Many2one (
' project.milestone ' ,
' Milestone ' ,
domain = " [( ' project_id ' , ' = ' , project_id)] " ,
compute = ' _compute_milestone_id ' ,
readonly = False ,
store = True ,
tracking = True ,
index = ' btree_not_null ' ,
help = " Deliver your services automatically when a milestone is reached by linking it to a sales order item. "
has_late_and_unreached_milestone = fields . Boolean (
compute = ' _compute_has_late_and_unreached_milestone ' ,
search = ' _search_has_late_and_unreached_milestone ' ,
export_string_translation = False ,
# Task Dependencies fields
allow_task_dependencies = fields . Boolean ( related = ' project_id.allow_task_dependencies ' , export_string_translation = False )
# Tracking of this field is done in the write function
depend_on_ids = fields . Many2many ( ' project.task ' , relation = " task_dependencies_rel " , column1 = " task_id " ,
column2 = " depends_on_id " , string = " Blocked By " , tracking = True , copy = False ,
domain = " [( ' project_id ' , ' != ' , False), ( ' id ' , ' != ' , id)] " )
depend_on_count = fields . Integer ( string = " Depending on Tasks " , compute = ' _compute_depend_on_count ' , compute_sudo = True )
closed_depend_on_count = fields . Integer ( string = " Closed Depending on Tasks " , compute = ' _compute_depend_on_count ' , compute_sudo = True )
dependent_ids = fields . Many2many ( ' project.task ' , relation = " task_dependencies_rel " , column1 = " depends_on_id " ,
column2 = " task_id " , string = " Block " , copy = False ,
domain = " [( ' project_id ' , ' != ' , False), ( ' id ' , ' != ' , id)] " , export_string_translation = False )
dependent_tasks_count = fields . Integer ( string = " Dependent Tasks " , compute = ' _compute_dependent_tasks_count ' , export_string_translation = False )
# Project sharing fields
display_parent_task_button = fields . Boolean ( compute = ' _compute_display_parent_task_button ' , compute_sudo = True , export_string_translation = False )
current_user_same_company_partner = fields . Boolean ( compute = ' _compute_current_user_same_company_partner ' , compute_sudo = True , export_string_translation = False )
display_follow_button = fields . Boolean ( compute = ' _compute_display_follow_button ' , compute_sudo = True , export_string_translation = False )
# recurrence fields
recurring_task = fields . Boolean ( string = " Recurrent " )
recurring_count = fields . Integer ( string = " Tasks in Recurrence " , compute = ' _compute_recurring_count ' )
recurrence_id = fields . Many2one ( ' project.task.recurrence ' , copy = False )
repeat_interval = fields . Integer ( string = ' Repeat Every ' , default = 1 , compute = ' _compute_repeat ' , compute_sudo = True , readonly = False )
repeat_unit = fields . Selection ( [
( ' day ' , ' Days ' ) ,
( ' week ' , ' Weeks ' ) ,
( ' month ' , ' Months ' ) ,
( ' year ' , ' Years ' ) ,
] , default = ' week ' , compute = ' _compute_repeat ' , compute_sudo = True , readonly = False )
repeat_type = fields . Selection ( [
( ' forever ' , ' Forever ' ) ,
( ' until ' , ' Until ' ) ,
] , default = " forever " , string = " Until " , compute = ' _compute_repeat ' , compute_sudo = True , readonly = False )
repeat_until = fields . Date ( string = " End Date " , compute = ' _compute_repeat ' , compute_sudo = True , readonly = False )
# Quick creation shortcuts
display_name = fields . Char (
compute = ' _compute_display_name ' ,
inverse = ' _inverse_display_name ' ,
search = ' _search_display_name ' ,
help = """ Use these keywords in the title to set new tasks: \n
#tags Set tags on the task
@user Assign the task to a user
! Set the task a high priority \n
Make sure to use the right format and order e . g . Improve the configuration screen #feature #v16 @Mitchell !""",
link_preview_name = fields . Char ( compute = ' _compute_link_preview_name ' , export_string_translation = False )
_sql_constraints = [
( ' recurring_task_has_no_parent ' , ' CHECK (NOT (recurring_task IS TRUE AND parent_id IS NOT NULL)) ' , " A subtask cannot be recurrent. " ) ,
( ' private_task_has_no_parent ' , ' CHECK (NOT (project_id IS NULL AND parent_id IS NOT NULL)) ' , " A private task cannot have a parent. " ) ,
@api.constrains ( ' company_id ' , ' partner_id ' )
def _ensure_company_consistency_with_partner ( self ) :
""" Ensures that the company of the task is valid for the partner. """
for task in self :
if task . partner_id and task . partner_id . company_id and task . company_id and task . company_id != task . partner_id . company_id :
raise ValidationError ( _ ( ' The task and the associated partner must be linked to the same company. ' ) )
@api.constrains ( ' child_ids ' , ' project_id ' )
def _ensure_super_task_is_not_private ( self ) :
""" Ensures that the company of the task is valid for the partner. """
for task in self :
if not task . project_id and task . subtask_count :
raise ValidationError ( _ ( ' This task has sub-tasks, so it can \' t be private. ' ) )
@api.depends ( ' parent_id.project_id ' )
def _compute_project_id ( self ) :
self . env . remove_to_compute ( self . _fields [ ' display_in_project ' ] , self )
for task in self :
if not task . display_in_project and task . parent_id and task . parent_id . project_id != task . project_id :
task . project_id = task . parent_id . project_id
@api.onchange ( ' parent_id ' )
def _onchange_parent_id ( self ) :
if self . display_in_project :
if not self . parent_id :
self . display_in_project = True
elif self . project_id != self . parent_id . project_id :
self . project_id = self . parent_id . project_id
@api.depends ( ' project_id ' )
def _compute_display_in_project ( self ) :
self . filtered (
lambda t : not t . display_in_project and (
not t . project_id or t . project_id != t . parent_id . project_id
) . display_in_project = True
@api.depends ( ' project_id ' , ' parent_id ' )
def _compute_show_display_in_project ( self ) :
for task in self :
task . show_display_in_project = bool ( task . parent_id ) and task . project_id == task . parent_id . project_id
@api.depends ( ' stage_id ' , ' depend_on_ids.state ' , ' project_id.allow_task_dependencies ' )
def _compute_state ( self ) :
for task in self :
dependent_open_tasks = [ ]
if task . allow_task_dependencies :
dependent_open_tasks = [ dependent_task for dependent_task in task . depend_on_ids if dependent_task . state not in CLOSED_STATES ]
# if one of the blocking task is in a blocking state
if dependent_open_tasks :
# here we check that the blocked task is not already in a closed state (if the task is already done we don't put it in waiting state)
if task . state not in CLOSED_STATES :
task . state = ' 04_waiting_normal '
# if the task as no blocking dependencies and is in waiting_normal, the task goes back to in progress
elif task . state not in CLOSED_STATES :
task . state = ' 01_in_progress '
@api.depends ( ' state ' )
def _compute_is_closed ( self ) :
for task in self :
task . is_closed = task . state in CLOSED_STATES
def _search_is_closed ( self , operator , value ) :
if operator not in ( ' = ' , ' != ' ) or not isinstance ( value , bool ) :
raise NotImplementedError ( _ (
" The search does not support operator %(operator)s or value %(value)s . " ,
operator = operator ,
value = value ,
) )
if ( operator == ' != ' and value ) or ( operator == ' = ' and not value ) :
searched_states = self . OPEN_STATES
else :
searched_states = list ( CLOSED_STATES . keys ( ) )
domain = [
( ' state ' , ' in ' , searched_states )
return domain
def OPEN_STATES ( self ) :
""" Return a list of the technical names complementing the CLOSED_STATES, a.k.a the open states """
return list ( set ( self . _fields [ ' state ' ] . get_values ( self . env ) ) - set ( CLOSED_STATES ) )
@api.onchange ( ' project_id ' )
def _onchange_project_id ( self ) :
if self . state != ' 04_waiting_normal ' :
self . state = ' 01_in_progress '
def is_blocked_by_dependences ( self ) :
return any ( blocking_task . state not in CLOSED_STATES for blocking_task in self . depend_on_ids )
def _inverse_state ( self ) :
last_task_id_per_recurrence_id = self . recurrence_id . _get_last_task_id_per_recurrence_id ( )
for task in self :
if task . state in CLOSED_STATES and task . id == last_task_id_per_recurrence_id . get ( task . recurrence_id . id ) :
task . recurrence_id . _create_next_occurrence ( task )
@api.depends_context ( ' uid ' )
@api.depends ( ' user_ids ' )
def _compute_personal_stage_id ( self ) :
# An user may only access his own 'personal stage' and there can only be one pair (user, task_id)
personal_stages = self . env [ ' project.task.stage.personal ' ] . search ( [ ( ' user_id ' , ' = ' , self . env . uid ) , ( ' task_id ' , ' in ' , self . ids ) ] )
self . personal_stage_id = False
for personal_stage in personal_stages :
personal_stage . task_id . personal_stage_id = personal_stage
@api.depends ( ' personal_stage_id ' )
def _compute_personal_stage_type_id ( self ) :
for task in self :
task . personal_stage_type_id = task . personal_stage_id . stage_id
def _inverse_personal_stage_type_id ( self ) :
for task in self :
task . personal_stage_id . stage_id = task . personal_stage_type_id
def _search_personal_stage_type_id ( self , operator , value ) :
return [ ( ' personal_stage_type_ids ' , operator , value ) ]
def _get_default_personal_stage_create_vals ( self , user_id ) :
return [
{ ' sequence ' : 1 , ' name ' : _ ( ' Inbox ' ) , ' user_id ' : user_id , ' fold ' : False } ,
{ ' sequence ' : 2 , ' name ' : _ ( ' Today ' ) , ' user_id ' : user_id , ' fold ' : False } ,
{ ' sequence ' : 3 , ' name ' : _ ( ' This Week ' ) , ' user_id ' : user_id , ' fold ' : False } ,
{ ' sequence ' : 4 , ' name ' : _ ( ' This Month ' ) , ' user_id ' : user_id , ' fold ' : False } ,
{ ' sequence ' : 5 , ' name ' : _ ( ' Later ' ) , ' user_id ' : user_id , ' fold ' : False } ,
{ ' sequence ' : 6 , ' name ' : _ ( ' Done ' ) , ' user_id ' : user_id , ' fold ' : True } ,
{ ' sequence ' : 7 , ' name ' : _ ( ' Cancelled ' ) , ' user_id ' : user_id , ' fold ' : True } ,
def _populate_missing_personal_stages ( self ) :
# Assign the default personal stage for those that are missing
personal_stages_without_stage = self . env [ ' project.task.stage.personal ' ] . sudo ( ) . search ( [ ( ' task_id ' , ' in ' , self . ids ) , ( ' stage_id ' , ' = ' , False ) ] )
if personal_stages_without_stage :
user_ids = personal_stages_without_stage . user_id
personal_stage_by_user = defaultdict ( lambda : self . env [ ' project.task.stage.personal ' ] )
for personal_stage in personal_stages_without_stage :
personal_stage_by_user [ personal_stage . user_id ] | = personal_stage
for user_id in user_ids :
stage = self . env [ ' project.task.type ' ] . sudo ( ) . search ( [ ( ' user_id ' , ' = ' , user_id . id ) ] , limit = 1 )
# In the case no stages have been found, we create the default stages for the user
if not stage :
stages = self . env [ ' project.task.type ' ] . sudo ( ) . with_context ( lang = user_id . partner_id . lang , default_project_ids = False ) . create (
self . with_context ( lang = user_id . partner_id . lang ) . _get_default_personal_stage_create_vals ( user_id . id )
stage = stages [ 0 ]
personal_stage_by_user [ user_id ] . sudo ( ) . write ( { ' stage_id ' : stage . id } )
def message_subscribe ( self , partner_ids = None , subtype_ids = None ) :
""" Set task notification based on project notification preference if user follow the project """
if not subtype_ids :
project_followers = self . project_id . message_follower_ids . filtered ( lambda f : f . partner_id . id in partner_ids )
for project_follower in project_followers :
project_subtypes = project_follower . subtype_ids
task_subtypes = ( project_subtypes . mapped ( ' parent_id ' ) | project_subtypes . filtered ( lambda sub : sub . internal or sub . default ) ) . ids if project_subtypes else None
partner_ids . remove ( project_follower . partner_id . id )
super ( ) . message_subscribe ( project_follower . partner_id . ids , task_subtypes )
return super ( ) . message_subscribe ( partner_ids , subtype_ids )
@api.constrains ( ' depend_on_ids ' )
def _check_no_cyclic_dependencies ( self ) :
if self . _has_cycle ( ' depend_on_ids ' ) :
raise ValidationError ( _ ( " Two tasks cannot depend on each other. " ) )
def _get_recurrence_fields ( self ) :
return [
' repeat_interval ' ,
' repeat_unit ' ,
' repeat_type ' ,
' repeat_until ' ,
@api.depends ( ' recurring_task ' )
def _compute_repeat ( self ) :
rec_fields = self . _get_recurrence_fields ( )
defaults = self . default_get ( rec_fields )
for task in self :
for f in rec_fields :
if task . recurrence_id :
task [ f ] = task . recurrence_id . sudo ( ) [ f ]
else :
if task . recurring_task :
task [ f ] = defaults . get ( f )
else :
task [ f ] = False
def _is_recurrence_valid ( self ) :
self . ensure_one ( )
return self . repeat_interval > 0 and \
( self . repeat_type != ' until ' or self . repeat_until and self . repeat_until > fields . Date . today ( ) )
@api.depends ( ' recurrence_id ' )
def _compute_recurring_count ( self ) :
self . recurring_count = 0
recurring_tasks = self . filtered ( lambda l : l . recurrence_id )
count = self . env [ ' project.task ' ] . _read_group ( [ ( ' recurrence_id ' , ' in ' , recurring_tasks . recurrence_id . ids ) ] , [ ' recurrence_id ' ] , [ ' __count ' ] )
tasks_count = { recurrence . id : count for recurrence , count in count }
for task in recurring_tasks :
task . recurring_count = tasks_count . get ( task . recurrence_id . id , 0 )
@api.depends ( ' depend_on_ids ' )
def _compute_depend_on_count ( self ) :
tasks_with_dependency = self . filtered ( ' allow_task_dependencies ' )
tasks_without_dependency = self - tasks_with_dependency
tasks_without_dependency . depend_on_count = 0
tasks_without_dependency . closed_depend_on_count = 0
if not any ( self . _ids ) :
for task in self :
task . depend_on_count = len ( task . depend_on_ids )
task . closed_depend_on_count = len ( task . depend_on_ids . filtered ( lambda r : r . state in CLOSED_STATES ) )
if tasks_with_dependency :
# need the sudo for project sharing
total_and_closed_depend_on_count = {
dependent_on . id : ( count , sum ( s in CLOSED_STATES for s in states ) )
for dependent_on , states , count in self . env [ ' project.task ' ] . _read_group (
[ ( ' dependent_ids ' , ' in ' , tasks_with_dependency . ids ) ] ,
[ ' dependent_ids ' ] ,
[ ' state:array_agg ' , ' __count ' ] ,
for task in tasks_with_dependency :
task . depend_on_count , task . closed_depend_on_count = total_and_closed_depend_on_count . get ( task . _origin . id or task . id , ( 0 , 0 ) )
@api.depends ( ' dependent_ids ' )
def _compute_dependent_tasks_count ( self ) :
tasks_with_dependency = self . filtered ( ' allow_task_dependencies ' )
( self - tasks_with_dependency ) . dependent_tasks_count = 0
if tasks_with_dependency :
group_dependent = self . env [ ' project.task ' ] . _read_group ( [
( ' depend_on_ids ' , ' in ' , tasks_with_dependency . ids ) ,
( ' is_closed ' , ' = ' , False ) ,
] , [ ' depend_on_ids ' ] , [ ' __count ' ] )
dependent_tasks_count_dict = {
depend_on . id : count
for depend_on , count in group_dependent
for task in tasks_with_dependency :
task . dependent_tasks_count = dependent_tasks_count_dict . get ( task . id , 0 )
@api.constrains ( ' parent_id ' )
def _check_parent_id ( self ) :
if self . _has_cycle ( ) :
raise ValidationError ( _ ( ' Error! You cannot create a recursive hierarchy of tasks. ' ) )
def _get_attachments_search_domain ( self ) :
self . ensure_one ( )
return [ ( ' res_id ' , ' = ' , self . id ) , ( ' res_model ' , ' = ' , ' project.task ' ) ]
def _compute_attachment_ids ( self ) :
for task in self :
attachment_ids = self . env [ ' ir.attachment ' ] . search ( task . _get_attachments_search_domain ( ) ) . ids
message_attachment_ids = task . mapped ( ' message_ids.attachment_ids ' ) . ids # from mail_thread
task . attachment_ids = [ ( 6 , 0 , list ( set ( attachment_ids ) - set ( message_attachment_ids ) ) ) ]
@api.depends ( ' create_date ' , ' date_end ' , ' date_assign ' )
def _compute_elapsed ( self ) :
task_linked_to_calendar = self . filtered (
lambda task : task . project_id . resource_calendar_id and task . create_date
for task in task_linked_to_calendar :
dt_create_date = fields . Datetime . from_string ( task . create_date )
if task . date_assign :
dt_date_assign = fields . Datetime . from_string ( task . date_assign )
duration_data = task . project_id . resource_calendar_id . get_work_duration_data ( dt_create_date , dt_date_assign , compute_leaves = True )
task . working_hours_open = duration_data [ ' hours ' ]
task . working_days_open = duration_data [ ' days ' ]
else :
task . working_hours_open = 0.0
task . working_days_open = 0.0
if task . date_end :
dt_date_end = fields . Datetime . from_string ( task . date_end )
duration_data = task . project_id . resource_calendar_id . get_work_duration_data ( dt_create_date , dt_date_end , compute_leaves = True )
task . working_hours_close = duration_data [ ' hours ' ]
task . working_days_close = duration_data [ ' days ' ]
else :
task . working_hours_close = 0.0
task . working_days_close = 0.0
( self - task_linked_to_calendar ) . update ( dict . fromkeys (
[ ' working_hours_open ' , ' working_hours_close ' , ' working_days_open ' , ' working_days_close ' ] , 0.0 ) )
def _compute_access_url ( self ) :
super ( Task , self ) . _compute_access_url ( )
for task in self :
task . access_url = f ' /my/tasks/ { task . id } '
def _compute_access_warning ( self ) :
super ( Task , self ) . _compute_access_warning ( )
for task in self . filtered ( lambda x : x . project_id . privacy_visibility != ' portal ' ) :
visibility_field = self . env [ ' ir.model.fields ' ] . search ( [ ( ' model ' , ' = ' , ' project.project ' ) , ( ' name ' , ' = ' , ' privacy_visibility ' ) ] , limit = 1 )
visibility_public = self . env [ ' ir.model.fields.selection ' ] . search ( [ ( ' field_id ' , ' = ' , visibility_field . id ) , ( ' value ' , ' = ' , ' portal ' ) ] )
task . access_warning = _ (
" The task cannot be shared with the recipient(s) because the privacy of the project is too restricted. Set the privacy of the project to ' %(visibility)s ' in order to make it accessible by the recipient(s). " ,
visibility = visibility_public . name ,
@api.depends ( ' child_ids.allocated_hours ' )
def _compute_subtask_allocated_hours ( self ) :
for task in self :
task . subtask_allocated_hours = sum ( task . child_ids . mapped ( ' allocated_hours ' ) )
@api.depends ( ' child_ids ' )
def _compute_subtask_count ( self ) :
if not any ( self . _ids ) :
for task in self :
task . subtask_count , task . closed_subtask_count = len ( task . child_ids ) , len ( task . child_ids . filtered ( lambda r : r . state in CLOSED_STATES ) )
total_and_closed_subtask_count_per_parent_id = {
parent . id : ( count , sum ( s in CLOSED_STATES for s in states ) )
for parent , states , count in self . env [ ' project.task ' ] . _read_group (
[ ( ' parent_id ' , ' in ' , self . ids ) ] ,
[ ' parent_id ' ] ,
[ ' state:array_agg ' , ' __count ' ] ,
for task in self :
task . subtask_count , task . closed_subtask_count = total_and_closed_subtask_count_per_parent_id . get ( task . id , ( 0 , 0 ) )
@api.onchange ( ' company_id ' )
def _onchange_task_company ( self ) :
if self . project_id . company_id and self . project_id . company_id != self . company_id :
self . project_id = False
@api.depends ( ' project_id.company_id ' , ' parent_id.company_id ' )
def _compute_company_id ( self ) :
for task in self :
if not task . parent_id and not task . project_id :
task . company_id = task . project_id . company_id or task . parent_id . company_id
@api.depends ( ' project_id ' )
def _compute_stage_id ( self ) :
for task in self :
project = task . project_id or task . parent_id . project_id
if project :
if project not in task . stage_id . project_ids :
task . stage_id = task . stage_find ( project . id , [ ( ' fold ' , ' = ' , False ) ] )
else :
task . stage_id = False
@api.depends ( ' user_ids ' )
def _compute_portal_user_names ( self ) :
""" This compute method allows to see all the names of assigned users to each task contained in `self`.
When we are in the project sharing feature , the ` user_ids ` contains only the users if we are a portal user .
That is , only the users in the same company of the current user .
So this compute method is a related of ` user_ids . name ` but with more records that the portal user
can normally see .
( In other words , this compute is only used in project sharing views to see all assignees for each task )
if self . _origin :
# fetch 'user_ids' in superuser mode (and override value in cache
# browse is useful to avoid miscache because of the newIds contained in self
self . invalidate_recordset ( fnames = [ ' user_ids ' ] )
self . _origin . fetch ( [ ' user_ids ' ] )
for task in self . with_context ( prefetch_fields = False ) :
task . portal_user_names = format_list ( self . env , task . user_ids . mapped ( ' name ' ) )
def _search_portal_user_names ( self , operator , value ) :
if operator != ' ilike ' and not isinstance ( value , str ) :
raise ValidationError ( _ ( ' Not Implemented. ' ) )
sql = SQL ( """ (
SELECT task_user . task_id
FROM project_task_user_rel task_user
INNER JOIN res_users users ON task_user . user_id = users . id
INNER JOIN res_partner partners ON partners . id = users . partner_id
WHERE partners . name ILIKE % s
) """ , f " % {value} % " )
return [ ( ' id ' , ' in ' , sql ) ]
def _compute_display_parent_task_button ( self ) :
accessible_parent_tasks = self . parent_id . with_user ( self . env . user ) . _filtered_access ( ' read ' )
for task in self :
task . display_parent_task_button = task . parent_id in accessible_parent_tasks
def _compute_current_user_same_company_partner ( self ) :
commercial_partner_id = self . env . user . partner_id . commercial_partner_id
for task in self :
task . current_user_same_company_partner = task . partner_id and commercial_partner_id == task . partner_id . commercial_partner_id
def _compute_display_follow_button ( self ) :
if not self . env . user . share :
self . display_follow_button = False
project_collaborator_read_group = self . env [ ' project.collaborator ' ] . _read_group (
[ ( ' project_id ' , ' in ' , self . project_id . ids ) , ( ' partner_id ' , ' = ' , self . env . user . partner_id . id ) ] ,
[ ' project_id ' ] ,
[ ' limited_access:bool_and ' ] ,
limited_access_per_project_id = dict ( project_collaborator_read_group )
for task in self :
task . display_follow_button = not limited_access_per_project_id . get ( task . project_id , True )
def _get_group_pattern ( self ) :
return {
' tags_and_users ' : r ' \ s([#@] %s [^ \ s]+) ' ,
' priority ' : r ' \ s(!) ' ,
def _prepare_pattern_groups ( self ) :
group = self . _get_group_pattern ( )
return [
group [ ' tags_and_users ' ] % ' ' ,
group [ ' priority ' ] ,
def _get_groups_patterns ( self ) :
return [
r ' (?: %s )* ' % ( ' | ' ) . join ( self . _prepare_pattern_groups ( ) ) ,
def _get_cannot_start_with_patterns ( self ) :
return [ r ' (?![#!@ \ s]) ' ]
def _extract_tags_and_users ( self ) :
tags = [ ]
users = [ ]
tags_and_users_group = self . _get_group_pattern ( ) [ ' tags_and_users ' ]
for word in re . findall ( tags_and_users_group % ' ' , self . display_name ) :
( tags if word . startswith ( ' # ' ) else users ) . append ( word [ 1 : ] )
users_to_keep = [ ]
user_ids = [ ]
for user in users :
matched_users = self . env [ ' res.users ' ] . name_search ( user )
if len ( matched_users ) == 1 :
user_ids . append ( Command . link ( matched_users [ 0 ] [ 0 ] ) )
else :
users_to_keep . append ( r ' %s \ b ' % user )
self . user_ids = user_ids
if tags :
domain = expression . OR ( [ [ ( ' name ' , ' =ilike ' , tag ) ] for tag in tags ] )
existing_tags = self . env [ ' project.tags ' ] . search ( domain )
existing_tags_names = { tag . name . lower ( ) for tag in existing_tags }
new_tags_names = { tag for tag in tags if tag . lower ( ) not in existing_tags_names }
self . tag_ids = [ Command . set ( existing_tags . ids ) ] + [ Command . create ( { ' name ' : name } ) for name in new_tags_names ]
pattern = tags_and_users_group % ( ' (?! %s ) ' % ( ' | ' ) . join ( users_to_keep ) if users_to_keep else ' ' )
self . display_name , dummy = re . subn ( pattern , ' ' , self . display_name )
def _extract_priority ( self ) :
self . priority = " 1 "
priority_group = self . _get_group_pattern ( ) [ ' priority ' ]
self . display_name , dummy = re . subn ( priority_group , ' ' , self . display_name )
def _get_groups ( self ) :
return [
lambda task : task . _extract_tags_and_users ( ) ,
lambda task : task . _extract_priority ( ) ,
def _inverse_display_name ( self ) :
for task in self :
pattern = re . compile ( r ' ^ %s .+? %s $ ' % (
( ' ' ) . join ( task . _get_cannot_start_with_patterns ( ) ) ,
( ' ' ) . join ( task . _get_groups_patterns ( ) ) )
match = pattern . match ( task . display_name )
if match :
for group , extract_data in enumerate ( task . _get_groups ( ) , start = 1 ) :
if match . group ( group ) :
extract_data ( task )
task . name = task . display_name . strip ( )
def _compute_link_preview_name ( self ) :
for task in self :
link_preview_name = task . display_name
if task . project_id :
link_preview_name + = f ' | { task . project_id . sudo ( ) . name } '
task . link_preview_name = link_preview_name
def copy_data ( self , default = None ) :
default = dict ( default or { } )
vals_list = super ( ) . copy_data ( default = default )
not_project_user = not self . env . user . has_group ( ' project.group_project_user ' )
if not_project_user :
vals_list = [ { k : v for k , v in vals . items ( ) if k in self . SELF_READABLE_FIELDS } for vals in vals_list ]
milestone_mapping = self . env . context . get ( ' milestone_mapping ' , { } )
for task , vals in zip ( self , vals_list ) :
if not default . get ( ' stage_id ' ) :
vals [ ' stage_id ' ] = task . stage_id . id
if ' active ' not in default and not task [ ' active ' ] and not self . env . context . get ( ' copy_project ' ) :
vals [ ' active ' ] = True
vals [ ' name ' ] = task . name if self . env . context . get ( ' copy_project ' ) else _ ( " %s (copy) " , task . name )
if task . recurrence_id and not default . get ( ' recurrence_id ' ) :
vals [ ' recurrence_id ' ] = task . recurrence_id . copy ( ) . id
if task . allow_milestones :
vals [ ' milestone_id ' ] = milestone_mapping . get ( vals [ ' milestone_id ' ] , vals [ ' milestone_id ' ] )
if task . child_ids and not default . get ( ' child_ids ' ) :
default = {
' depend_on_ids ' : False ,
' dependent_ids ' : False ,
' parent_id ' : False ,
vals [ ' child_ids ' ] = [ Command . create ( child_id . copy_data ( default ) [ 0 ] ) for child_id in task . child_ids ]
return vals_list
def _create_task_mapping ( self , copied_tasks ) :
Thanks to the way create and command . create is handled , when a task with 2 children is copied , we have the guarantee that the children of the
copied task will have the same index in the child_ids recordset . We can use this behavior to create a mapping containing all the original tasks and their copy .
: return :
task_mapping : a dict containing the mapping of the original task ids and their copied task ( k : original_task . id , v : new_task )
task_dependencies : a dict containing the ids of the dependencies of the original task when they have one .
( k : original_task_id , v : [ original_task . depend_on_ids . ids , original_task . dependent_ids . ids ]
task_mapping , task_dependencies = { } , { }
for original_task , copied_task in zip ( self , copied_tasks ) :
task_mapping [ original_task . id ] = copied_task
if original_task . allow_task_dependencies and ( original_task . depend_on_ids or original_task . dependent_ids ) :
task_dependencies [ original_task . id ] = [ original_task . depend_on_ids . ids , original_task . dependent_ids . ids ]
if original_task . child_ids :
# If the task has children, we have to call the method create_task_mapping to get their ids and dependencies mapping too.
children_mapping , children_dependencies = original_task . child_ids . _create_task_mapping ( copied_task . child_ids )
task_mapping . update ( children_mapping )
task_dependencies . update ( children_dependencies )
return task_mapping , task_dependencies
def _portal_get_parent_hash_token ( self , pid ) :
return self . project_id . _sign_token ( pid )
def copy ( self , default = None ) :
default = default or { }
default . update ( {
' depend_on_ids ' : False ,
' dependent_ids ' : False ,
} )
copied_tasks = super ( Task , self . with_context (
mail_auto_subscribe_no_notify = True ,
mail_create_nosubscribe = True ,
mail_create_nolog = True ,
) ) . copy ( default = default )
task_mapping , task_dependencies = self . _create_task_mapping ( copied_tasks )
for original_task_id , ( depend_on_ids , dependant_ids ) in task_dependencies . items ( ) :
# If one of the task_id in the dependencies mapping is also a key of the task_mapping, it means that this task was copied too.
# In this case, we should exchange this id with the id of the corresponding copied task
task_mapping [ original_task_id ] . depend_on_ids = [
task_id if task_id not in task_mapping else task_mapping [ task_id ] . id
for task_id in depend_on_ids
task_mapping [ original_task_id ] . dependent_ids = [
task_id if task_id not in task_mapping else task_mapping [ task_id ] . id
for task_id in dependant_ids
return copied_tasks
def get_empty_list_help ( self , help ) :
tname = _ ( " task " )
project_id = self . env . context . get ( ' default_project_id ' , False )
if project_id :
name = self . env [ ' project.project ' ] . browse ( project_id ) . label_tasks
if name : tname = name . lower ( )
self = self . with_context (
empty_list_help_id = self . env . context . get ( ' default_project_id ' ) ,
empty_list_help_model = ' project.project ' ,
empty_list_help_document_name = tname ,
return super ( Task , self ) . get_empty_list_help ( help )
# ----------------------------------------
# Case management
# ----------------------------------------
def stage_find ( self , section_id , domain = [ ] , order = ' sequence, id ' ) :
""" Override of the base.stage method
Parameter of the stage search taken from the lead :
- section_id : if set , stages must belong to this section or
be a default stage ; if not set , stages must be default
# collect all section_ids
section_ids = [ ]
if section_id :
section_ids . append ( section_id )
section_ids . extend ( self . mapped ( ' project_id ' ) . ids )
search_domain = [ ]
if section_ids :
search_domain = [ ( ' | ' ) ] * ( len ( section_ids ) - 1 )
for section_id in section_ids :
search_domain . append ( ( ' project_ids ' , ' = ' , section_id ) )
search_domain + = list ( domain )
# perform search, return the first found
return self . env [ ' project.task.type ' ] . search ( search_domain , order = order , limit = 1 ) . id
# ------------------------------------------------
# CRUD overrides
# ------------------------------------------------
def fields_get ( self , allfields = None , attributes = None ) :
fields = super ( ) . fields_get ( allfields = allfields , attributes = attributes )
if not self . env . user . _is_portal ( ) :
return fields
readable_fields = self . SELF_READABLE_FIELDS
public_fields = { field_name : description for field_name , description in fields . items ( ) if field_name in readable_fields }
writable_fields = self . SELF_WRITABLE_FIELDS
for field_name , description in public_fields . items ( ) :
if field_name not in writable_fields and not description . get ( ' readonly ' , False ) :
# If the field is not in Writable fields and it is not readonly then we force the readonly to True
description [ ' readonly ' ] = True
return public_fields
def _get_view_cache_key ( self , view_id = None , view_type = ' form ' , * * options ) :
""" The override of fields_get making fields readonly for portal users
makes the view cache dependent on the fact the user has the group portal or not """
key = super ( ) . _get_view_cache_key ( view_id , view_type , * * options )
return key + ( self . env . user . _is_portal ( ) , )
def default_get ( self , default_fields ) :
vals = super ( Task , self ) . default_get ( default_fields )
# prevent creating new task in the waiting state
if ' state ' in default_fields and vals . get ( ' state ' ) == ' 04_waiting_normal ' :
vals [ ' state ' ] = ' 01_in_progress '
if ' repeat_until ' in default_fields :
vals [ ' repeat_until ' ] = fields . Date . today ( ) + timedelta ( days = 7 )
if ' partner_id ' in vals and not vals [ ' partner_id ' ] :
# if the default_partner_id=False or no default_partner_id then we search the partner based on the project and parent
project_id = vals . get ( ' project_id ' )
parent_id = vals . get ( ' parent_id ' , self . env . context . get ( ' default_parent_id ' ) )
if project_id or parent_id :
partner_id = self . _get_default_partner_id (
project_id and self . env [ ' project.project ' ] . browse ( project_id ) ,
parent_id and self . env [ ' project.task ' ] . browse ( parent_id )
if partner_id :
vals [ ' partner_id ' ] = partner_id
project_id = vals . get ( ' project_id ' , self . env . context . get ( ' default_project_id ' ) )
if project_id :
project = self . env [ ' project.project ' ] . browse ( project_id )
if ' company_id ' in default_fields and ' default_project_id ' not in self . env . context :
2025-03-04 12:23:19 +07:00
vals [ ' company_id ' ] = project . sudo ( ) . company_id . id
2025-01-06 10:57:38 +07:00
elif ' default_user_ids ' not in self . env . context and ' user_ids ' in default_fields :
user_ids = vals . get ( ' user_ids ' , [ ] )
user_ids . append ( Command . link ( self . env . user . id ) )
vals [ ' user_ids ' ] = user_ids
return vals
def _ensure_fields_are_accessible ( self , fields , operation = ' read ' , check_group_user = True ) :
""" " ensure all fields are accessible by the current user
This method checks if the portal user can access to all fields given in parameter .
By default , it checks if the current user is a portal user and then checks if all fields are accessible for this user .
: param fields : list of fields to check if the current user can access .
: param operation : contains either ' read ' to check readable fields or ' write ' to check writable fields .
: param check_group_user : contains boolean value .
- True , if the method has to check if the current user is a portal one .
- False if we are sure the user is a portal user ,
assert operation in ( ' read ' , ' write ' ) , ' Invalid operation '
if fields and ( not check_group_user or self . env . user . _is_portal ( ) ) and not self . env . su :
unauthorized_fields = set ( fields ) - ( self . SELF_READABLE_FIELDS if operation == ' read ' else self . SELF_WRITABLE_FIELDS )
if unauthorized_fields :
unauthorized_field_list = format_list ( self . env , list ( unauthorized_fields ) )
if operation == ' read ' :
error_message = _ ( ' You cannot read the following fields on tasks: %(field_list)s ' , field_list = unauthorized_field_list )
else :
error_message = _ ( ' You cannot write on the following fields on tasks: %(field_list)s ' , field_list = unauthorized_field_list )
raise AccessError ( error_message )
def _determine_fields_to_fetch ( self , field_names , ignore_when_in_cache = False ) :
if not self . env . su and self . env . user . _is_portal ( ) :
valid_names = self . SELF_READABLE_FIELDS
field_names = [ fname for fname in field_names if fname in valid_names ]
return super ( ) . _determine_fields_to_fetch ( field_names , ignore_when_in_cache )
def _get_portal_sudo_vals ( self , vals , defaults = False ) :
""" returns the values which must be written without and with sudo when a portal user creates / writes a task.
: param vals : dict of { field : value } , the values to create / write
: return : a tuple with 2 dicts :
- the first with the values to write without sudo
- the second with the values to write with sudo
vals_no_sudo = { key : val for key , val in vals . items ( ) if self . _fields [ key ] . type in ( ' one2many ' , ' many2many ' ) }
if defaults :
vals_no_sudo . update ( {
key [ 8 : ] : value
for key , value in self . env . context . items ( )
if key . startswith ( ' default_ ' ) and key [ 8 : ] in self . SELF_WRITABLE_FIELDS and self . _fields [ key [ 8 : ] ] . type in ( ' one2many ' , ' many2many ' )
} )
vals_sudo = { key : val for key , val in vals . items ( ) if key not in vals_no_sudo }
return vals_no_sudo , vals_sudo
def _get_portal_sudo_context ( self ) :
return {
key : value for key , value in self . env . context . items ( )
if key == ' default_project_id '
or key == ' default_user_ids ' and value is False
or not key . startswith ( ' default_ ' )
or key [ 8 : ] in ( field for field in self . SELF_WRITABLE_FIELDS if self . _fields [ field ] . type not in ( ' one2many ' , ' many2many ' ) )
def check_field_access_rights ( self , operation , field_names ) :
if field_names and operation in ( ' read ' , ' write ' ) :
self . _ensure_fields_are_accessible ( field_names , operation )
elif not field_names and not self . env . su and self . env . user . _is_portal ( ) :
valid_names = self . SELF_READABLE_FIELDS
return [
fname for fname in super ( ) . check_field_access_rights ( operation , field_names )
if fname in valid_names
return super ( ) . check_field_access_rights ( operation , field_names )
def _set_stage_on_project_from_task ( self ) :
stage_ids_per_project = defaultdict ( list )
for task in self :
if task . stage_id and task . stage_id not in task . project_id . type_ids and task . stage_id . id not in stage_ids_per_project [ task . project_id ] :
stage_ids_per_project [ task . project_id ] . append ( task . stage_id . id )
for project , stage_ids in stage_ids_per_project . items ( ) :
project . write ( { ' type_ids ' : [ Command . link ( stage_id ) for stage_id in stage_ids ] } )
def _load_records_create ( self , vals_list ) :
for vals in vals_list :
if vals . get ( ' recurring_task ' ) :
if not vals . get ( ' recurrence_id ' ) :
default_val = self . default_get ( self . _get_recurrence_fields ( ) )
vals . update ( * * default_val )
project_id = vals . get ( ' project_id ' )
if project_id :
self = self . with_context ( default_project_id = project_id )
tasks = super ( ) . _load_records_create ( vals_list )
return tasks
def create ( self , vals_list ) :
new_context = dict ( self . env . context )
default_personal_stage = new_context . pop ( ' default_personal_stage_type_ids ' , False )
default_project_id = new_context . get ( " default_project_id " , False )
self = self . with_context ( new_context )
is_portal_user = self . env . user . _is_portal ( )
if is_portal_user :
self . browse ( ) . check_access ( ' create ' )
default_stage = dict ( )
for vals in vals_list :
project_id = vals . get ( ' project_id ' ) or default_project_id
if vals . get ( ' user_ids ' ) :
vals [ ' date_assign ' ] = fields . Datetime . now ( )
if not ( vals . get ( ' parent_id ' ) or project_id ) :
user_ids = self . _fields [ ' user_ids ' ] . convert_to_cache ( vals . get ( ' user_ids ' , [ ] ) , self . env [ ' project.task ' ] )
if self . env . user . id not in list ( user_ids ) + [ SUPERUSER_ID ] :
vals [ ' user_ids ' ] = [ Command . set ( list ( user_ids ) + [ self . env . user . id ] ) ]
if default_personal_stage and ' personal_stage_type_id ' not in vals :
vals [ ' personal_stage_type_id ' ] = default_personal_stage [ 0 ]
if not vals . get ( ' name ' ) and vals . get ( ' display_name ' ) :
vals [ ' name ' ] = vals [ ' display_name ' ]
if is_portal_user :
self . _ensure_fields_are_accessible ( vals . keys ( ) , operation = ' write ' , check_group_user = False )
if project_id and not " company_id " in vals :
vals [ " company_id " ] = self . env [ " project.project " ] . browse (
) . company_id . id
if not project_id and ( " stage_id " in vals or self . env . context . get ( ' default_stage_id ' ) ) :
vals [ " stage_id " ] = False
if project_id and " stage_id " not in vals :
# 1) Allows keeping the batch creation of tasks
# 2) Ensure the defaults are correct (and computed once by project),
# by using default get (instead of _get_default_stage_id or _stage_find),
if project_id not in default_stage :
default_stage [ project_id ] = self . with_context (
default_project_id = project_id
) . default_get ( [ ' stage_id ' ] ) . get ( ' stage_id ' )
vals [ " stage_id " ] = default_stage [ project_id ]
# Stage change: Update date_end if folded stage and date_last_stage_update
if vals . get ( ' stage_id ' ) :
vals . update ( self . update_date_end ( vals [ ' stage_id ' ] ) )
vals [ ' date_last_stage_update ' ] = fields . Datetime . now ( )
# recurrence
rec_fields = vals . keys ( ) & self . _get_recurrence_fields ( )
if rec_fields and vals . get ( ' recurring_task ' ) is True :
rec_values = { rec_field : vals [ rec_field ] for rec_field in rec_fields }
recurrence = self . env [ ' project.task.recurrence ' ] . create ( rec_values )
vals [ ' recurrence_id ' ] = recurrence . id
# The sudo is required for a portal user as the record creation
# requires the read access on other models, as mail.template
# in order to compute the field tracking
was_in_sudo = self . env . su
if is_portal_user :
vals_list_no_sudo , vals_list = zip ( * ( self . _get_portal_sudo_vals ( vals , defaults = True ) for vals in vals_list ) )
self_no_sudo , self = self , self . sudo ( ) . with_context ( self . _get_portal_sudo_context ( ) )
tasks = super ( Task , self . with_context ( mail_create_nosubscribe = True ) ) . create ( vals_list )
if is_portal_user :
for task , vals in zip ( tasks . with_env ( self_no_sudo . env ) , vals_list_no_sudo ) :
task . write ( vals )
tasks . _populate_missing_personal_stages ( )
self . _task_message_auto_subscribe_notify ( { task : task . user_ids - self . env . user for task in tasks } )
# in case we were already in sudo, we don't check the rights.
if is_portal_user and not was_in_sudo :
# since we use sudo to create tasks, we need to check
# if the portal user could really create the tasks based on the ir rule.
tasks . browse ( ) . with_user ( self . env . user ) . check_access ( ' create ' )
current_partner = self . env . user . partner_id
all_partner_emails = [ ]
for task in tasks :
all_partner_emails + = tools . email_split ( task . email_cc )
partners = self . env [ ' res.partner ' ] . search ( [ ( ' email ' , ' in ' , all_partner_emails ) ] )
partner_per_email = {
partner . email : partner
for partner in partners
if not all ( u . share for u in partner . user_ids )
if tasks . project_id :
tasks . sudo ( ) . _set_stage_on_project_from_task ( )
for task in tasks :
if task . project_id . privacy_visibility == ' portal ' :
task . _portal_ensure_token ( )
for follower in task . parent_id . message_follower_ids :
task . message_subscribe ( follower . partner_id . ids , follower . subtype_ids . ids )
if current_partner not in task . message_partner_ids :
task . message_subscribe ( current_partner . ids )
if task . email_cc :
partners_with_internal_user = self . env [ ' res.partner ' ]
for email in tools . email_split ( task . email_cc ) :
new_partner = partner_per_email . get ( email )
if new_partner :
partners_with_internal_user | = new_partner
if not partners_with_internal_user :
task . _send_email_notify_to_cc ( partners_with_internal_user )
task . message_subscribe ( partners_with_internal_user . ids )
return tasks
def write ( self , vals ) :
if len ( self ) == 1 :
handle_history_divergence ( self , ' description ' , vals )
portal_can_write = False
project_link_per_task_id = { }
partner_ids = [ ]
if self . env . user . _is_portal ( ) and not self . env . su :
# Check if all fields in vals are in SELF_WRITABLE_FIELDS
self . _ensure_fields_are_accessible ( vals . keys ( ) , operation = ' write ' , check_group_user = False )
self . check_access ( ' write ' )
portal_can_write = True
if ' milestone_id ' in vals :
# WARNING: has to be done after 'project_id' vals is written on subtasks
milestone = self . env [ ' project.milestone ' ] . browse ( vals [ ' milestone_id ' ] )
# 1. Task for which the milestone is unvalid -> milestone_id is reset
if ' project_id ' not in vals :
unvalid_milestone_tasks = self . filtered ( lambda task : task . project_id != milestone . project_id ) if vals [ ' milestone_id ' ] else self . env [ ' project.task ' ]
else :
unvalid_milestone_tasks = self if not vals [ ' milestone_id ' ] or milestone . project_id . id != vals [ ' project_id ' ] else self . env [ ' project.task ' ]
valid_milestone_tasks = self - unvalid_milestone_tasks
if unvalid_milestone_tasks :
unvalid_milestone_tasks . write ( { ' milestone_id ' : False } )
if valid_milestone_tasks :
valid_milestone_tasks . write ( { ' milestone_id ' : vals [ ' milestone_id ' ] } )
del vals [ ' milestone_id ' ]
# 2. Parent's milestone is set to subtask with no milestone recursively
subtasks_to_update = valid_milestone_tasks . child_ids . filtered (
lambda task : ( task not in self and \
not task . milestone_id and \
task . project_id == milestone . project_id and \
task . state not in CLOSED_STATES ) )
# 3. If parent and child task share the same milestone, child task's milestone is updated when the parent one is changed
# No need to check if state is changed in vals as it won't affect the subtasks selected for update
if ' project_id ' not in vals :
subtasks_to_update | = valid_milestone_tasks . child_ids . filtered (
lambda task : ( task not in self and \
task . milestone_id == task . parent_id . milestone_id and \
task . state not in CLOSED_STATES ) )
else :
subtasks_to_update | = valid_milestone_tasks . child_ids . filtered (
lambda task : ( task not in self and \
( not task . display_in_project or task . project_id . id == vals [ ' project_id ' ] ) and \
task . milestone_id == task . parent_id . milestone_id and \
task . state not in CLOSED_STATES ) )
if subtasks_to_update :
subtasks_to_update . write ( { ' milestone_id ' : vals [ ' milestone_id ' ] } )
if vals . get ( ' parent_id ' ) in self . ids :
raise UserError ( _ ( " Sorry. You can ' t set a task as its parent task. " ) )
# stage change: update date_last_stage_update
now = fields . Datetime . now ( )
if ' stage_id ' in vals :
if not ' project_id ' in vals and self . filtered ( lambda t : not t . project_id ) :
raise UserError ( _ ( ' You can only set a personal stage on a private task. ' ) )
vals . update ( self . update_date_end ( vals [ ' stage_id ' ] ) )
vals [ ' date_last_stage_update ' ] = now
task_ids_without_user_set = set ( )
if ' user_ids ' in vals and ' date_assign ' not in vals :
# prepare update of date_assign after super call
task_ids_without_user_set = { task . id for task in self if not task . user_ids }
# recurrence fields
rec_fields = vals . keys ( ) & self . _get_recurrence_fields ( )
if rec_fields :
rec_values = { rec_field : vals [ rec_field ] for rec_field in rec_fields }
for task in self :
if task . recurrence_id :
task . recurrence_id . write ( rec_values )
elif vals . get ( ' recurring_task ' ) :
recurrence = self . env [ ' project.task.recurrence ' ] . create ( rec_values )
task . recurrence_id = recurrence . id
if not vals . get ( ' recurring_task ' , True ) and self . recurrence_id :
tasks_in_recurrence = self . recurrence_id . task_ids
self . recurrence_id . unlink ( )
tasks_in_recurrence . write ( { ' recurring_task ' : False } )
# The sudo is required for a portal user as the record update
# requires the write access on others models, as rating.rating
# in order to keep the same name than the task.
if portal_can_write :
self_no_sudo , self = self , self . sudo ( ) . with_context ( self . _get_portal_sudo_context ( ) )
vals_no_sudo , vals = self . _get_portal_sudo_vals ( vals )
# Track user_ids to send assignment notifications
old_user_ids = { t : t . user_ids for t in self . sudo ( ) }
if " personal_stage_type_id " in vals and not vals [ ' personal_stage_type_id ' ] :
del vals [ ' personal_stage_type_id ' ]
# sends an email to the 'Task Creation' subtype subscribers
# When project_id is changed
if vals . get ( ' project_id ' ) :
project = self . env [ ' project.project ' ] . browse ( vals . get ( ' project_id ' ) )
notification_subtype_id = self . env [ ' ir.model.data ' ] . _xmlid_to_res_id ( ' project.mt_project_task_new ' )
partner_ids = project . message_follower_ids . filtered ( lambda follower : notification_subtype_id in follower . subtype_ids . ids ) . partner_id . ids
if partner_ids :
link_per_project_id = { }
for task in self :
if task . project_id :
project_link = link_per_project_id . get ( task . project_id . id )
if not project_link :
project_link = link_per_project_id [ task . project_id . id ] = task . project_id . _get_html_link ( title = task . project_id . display_name )
project_link_per_task_id [ task . id ] = project_link
2025-03-04 12:23:19 +07:00
if vals . get ( ' parent_id ' ) is False :
vals [ ' display_in_project ' ] = True
2025-01-06 10:57:38 +07:00
result = super ( ) . write ( vals )
if portal_can_write :
super ( Task , self_no_sudo ) . write ( vals_no_sudo )
if ' user_ids ' in vals :
self . _populate_missing_personal_stages ( )
# user_ids change: update date_assign
if ' user_ids ' in vals :
for task in self :
if not task . user_ids and task . date_assign :
task . date_assign = False
elif ' date_assign ' not in vals and task . id in task_ids_without_user_set :
task . date_assign = now
# rating on stage
if ' stage_id ' in vals and vals . get ( ' stage_id ' ) :
self . filtered ( lambda x : x . project_id . rating_active and x . project_id . rating_status == ' stage ' ) . _send_task_rating_mail ( force_send = True )
if ' state ' in vals :
# specific use case: when the blocked task goes from 'forced' done state to a not closed state, we fix the state back to waiting
for task in self :
if task . allow_task_dependencies :
if task . is_blocked_by_dependences ( ) and vals [ ' state ' ] not in CLOSED_STATES and vals [ ' state ' ] != ' 04_waiting_normal ' :
task . state = ' 04_waiting_normal '
task . date_last_stage_update = now
elif ' project_id ' in vals :
self . filtered ( lambda t : t . state != ' 04_waiting_normal ' ) . state = ' 01_in_progress '
self . _task_message_auto_subscribe_notify ( { task : task . user_ids - old_user_ids [ task ] - self . env . user for task in self } )
if partner_ids :
for task in self :
project_link = project_link_per_task_id . get ( task . id )
if project_link :
body = _ (
' Task Transferred from Project %(source_project)s to %(destination_project)s ' ,
source_project = project_link ,
destination_project = self . project_id . _get_html_link ( title = self . project_id . display_name ) ,
else :
body = _ ( ' Task Converted from To-Do ' )
task . message_notify (
body = body ,
partner_ids = partner_ids ,
email_layout_xmlid = ' mail.mail_notification_layout ' ,
record_name = task . display_name ,
return result
def unlink ( self ) :
# Add subtasks to batch of tasks to delete
self | = self . _get_all_subtasks ( )
last_task_id_per_recurrence_id = self . recurrence_id . _get_last_task_id_per_recurrence_id ( )
for task in self :
if task . id == last_task_id_per_recurrence_id . get ( task . recurrence_id . id ) :
task . recurrence_id . unlink ( )
return super ( ) . unlink ( )
def update_date_end ( self , stage_id ) :
project_task_type = self . env [ ' project.task.type ' ] . browse ( stage_id )
if project_task_type . fold :
return { ' date_end ' : fields . Datetime . now ( ) }
return { ' date_end ' : False }
def _search_on_comodel ( self , domain , field , comodel , additional_domain = None ) :
""" This method is called by `group_expand` methods, whose purpose is to add empty groups to the `read_group`
( which otherwise returns groups containing records that match the domain ) .
When specifically filtering on a comodel ' s field, the result of the `read_group` should contain all matching groups.
However , if the search isn ' t filtered on any comodel ' s field , the result shouldn ' t be affected,
which explains why we return ` False ` if ` filtered_domain ` is empty .
Returns :
False or recordset of the comodel given in parameter .
def _change_operator ( domain ) :
new_domain = [ ]
for dom in domain :
if len ( dom ) == 3 :
_ , op , value = dom
op = " ilike " if op == " child_of " else op
if isinstance ( value , list ) and all ( isinstance ( val , int ) for val in value ) :
new_domain . append ( ( " id " , op , value ) )
if isinstance ( value , str ) or ( isinstance ( value , list ) and not all ( isinstance ( val , str ) for val in value ) ) :
new_domain . append ( ( " name " , op , value ) )
if isinstance ( value , int ) :
if op == " = " :
op = " in "
if op == " != " :
op = " not in "
new_domain . append ( ( " id " , op , [ value ] ) )
else :
new_domain . append ( dom )
return new_domain
filtered_domain = filter_domain_leaf ( domain , lambda field_to_check : field_to_check in [
field ,
f " { field } .id " ,
f " { field } .name " ,
] , {
field : " name " ,
f " { field } .id " : " id " ,
f " { field } .name " : " name " ,
} )
filtered_domain = _change_operator ( filtered_domain )
if not filtered_domain :
return self . env [ comodel ]
if additional_domain :
filtered_domain = expression . AND ( [ filtered_domain , additional_domain ] )
return self . env [ comodel ] . search ( filtered_domain )
# ---------------------------------------------------
# Subtasks
# ---------------------------------------------------
@api.depends ( ' parent_id.partner_id ' , ' project_id ' )
def _compute_partner_id ( self ) :
""" Compute the partner_id when the tasks have no partner_id.
Use the project partner_id if any , or else the parent task partner_id .
for task in self :
if task . partner_id and not ( task . project_id or task . parent_id ) :
task . partner_id = False
if not task . partner_id :
task . partner_id = self . _get_default_partner_id ( task . project_id , task . parent_id )
@api.depends ( ' project_id ' )
def _compute_milestone_id ( self ) :
for task in self :
if task . project_id != task . milestone_id . project_id :
task . milestone_id = task . parent_id . project_id == task . project_id and task . parent_id . milestone_id
def _compute_has_late_and_unreached_milestone ( self ) :
if all ( not task . allow_milestones for task in self ) :
self . has_late_and_unreached_milestone = False
late_milestones = self . env [ ' project.milestone ' ] . sudo ( ) . _search ( [ # sudo is needed for the portal user in Project Sharing.
( ' id ' , ' in ' , self . milestone_id . ids ) ,
( ' is_reached ' , ' = ' , False ) ,
( ' deadline ' , ' <= ' , fields . Date . today ( ) ) ,
] )
for task in self :
task . has_late_and_unreached_milestone = task . allow_milestones and task . milestone_id . id in late_milestones
def _search_has_late_and_unreached_milestone ( self , operator , value ) :
if operator not in ( ' = ' , ' != ' ) or not isinstance ( value , bool ) :
raise NotImplementedError ( _ (
" The search does not support operator %(operator)s or value %(value)s . " ,
operator = operator ,
value = value ,
) )
domain = [
( ' allow_milestones ' , ' = ' , True ) ,
( ' milestone_id ' , ' != ' , False ) ,
( ' milestone_id.is_reached ' , ' = ' , False ) ,
( ' milestone_id.deadline ' , ' != ' , False ) , ( ' milestone_id.deadline ' , ' < ' , fields . Date . today ( ) )
if ( operator == ' != ' and value ) or ( operator == ' = ' and not value ) :
domain . insert ( 0 , expression . NOT_OPERATOR )
domain = expression . distribute_not ( domain )
return domain
# ---------------------------------------------------
# Mail gateway
# ---------------------------------------------------
def _notify_by_email_prepare_rendering_context ( self , message , msg_vals = False , model_description = False ,
force_email_company = False , force_email_lang = False ) :
render_context = super ( ) . _notify_by_email_prepare_rendering_context (
message , msg_vals , model_description = model_description ,
force_email_company = force_email_company , force_email_lang = force_email_lang
if self . stage_id :
render_context [ ' subtitles ' ] . append ( _ ( ' Stage: %s ' , self . stage_id . name ) )
return render_context
def _send_email_notify_to_cc ( self , partners_to_notify ) :
self . ensure_one ( )
template_id = self . env [ ' ir.model.data ' ] . _xmlid_to_res_id ( ' project.task_invitation_follower ' , raise_if_not_found = False )
if not template_id :
task_model_description = self . env [ ' ir.model ' ] . _get ( self . _name ) . display_name
values = {
' object ' : self ,
for partner in partners_to_notify :
values [ ' partner_name ' ] = partner . name
assignation_msg = self . env [ ' ir.qweb ' ] . _render ( ' project.task_invitation_follower ' , values , minimal_qcontext = True )
self . message_notify (
subject = _ ( ' You have been invited to follow %s ' , self . display_name ) ,
body = assignation_msg ,
partner_ids = partner . ids ,
record_name = self . display_name ,
email_layout_xmlid = ' mail.mail_notification_layout ' ,
model_description = task_model_description ,
mail_auto_delete = True ,
def _task_message_auto_subscribe_notify ( self , users_per_task ) :
if self . env . context . get ( ' mail_auto_subscribe_no_notify ' ) :
# Utility method to send assignation notification upon writing/creation.
template_id = self . env [ ' ir.model.data ' ] . _xmlid_to_res_id ( ' project.project_message_user_assigned ' , raise_if_not_found = False )
if not template_id :
task_model_description = self . env [ ' ir.model ' ] . _get ( self . _name ) . display_name
for task , users in users_per_task . items ( ) :
if not users :
values = {
' object ' : task ,
' model_description ' : task_model_description ,
' access_link ' : task . _notify_get_action_link ( ' view ' ) ,
for user in users :
values . update ( assignee_name = user . sudo ( ) . name )
assignation_msg = self . env [ ' ir.qweb ' ] . _render ( ' project.project_message_user_assigned ' , values , minimal_qcontext = True )
assignation_msg = self . env [ ' mail.render.mixin ' ] . _replace_local_links ( assignation_msg )
task . message_notify (
subject = _ ( ' You have been assigned to %s ' , task . display_name ) ,
body = assignation_msg ,
partner_ids = user . partner_id . ids ,
record_name = task . display_name ,
email_layout_xmlid = ' mail.mail_notification_layout ' ,
model_description = task_model_description ,
mail_auto_delete = False ,
def _message_auto_subscribe_followers ( self , updated_values , default_subtype_ids ) :
if ' user_ids ' not in updated_values :
return [ ]
# Since the changes to user_ids becoming a m2m, the default implementation of this function
# could not work anymore, override the function to keep the functionality.
new_followers = [ ]
# Normalize input to tuple of ids
value = self . _fields [ ' user_ids ' ] . convert_to_cache ( updated_values . get ( ' user_ids ' , [ ] ) , self . env [ ' project.task ' ] , validate = False )
users = self . env [ ' res.users ' ] . browse ( value )
for user in users :
try :
if user . partner_id :
# The you have been assigned notification is handled separately
new_followers . append ( ( user . partner_id . id , default_subtype_ids , False ) )
except Exception :
return new_followers
def _track_template ( self , changes ) :
res = super ( Task , self ) . _track_template ( changes )
test_task = self [ 0 ]
if ' stage_id ' in changes and test_task . stage_id . mail_template_id :
res [ ' stage_id ' ] = ( test_task . stage_id . mail_template_id , {
' auto_delete_keep_log ' : False ,
' subtype_id ' : self . env [ ' ir.model.data ' ] . _xmlid_to_res_id ( ' mail.mt_note ' ) ,
' email_layout_xmlid ' : ' mail.mail_notification_light '
} )
return res
def _creation_subtype ( self ) :
return self . env . ref ( ' project.mt_task_new ' )
def _creation_message ( self ) :
self . ensure_one ( )
if self . project_id :
return _ ( ' A new task has been created in the " %(project_name)s " project. ' ,
project_name = self . project_id . display_name )
return _ ( ' A new task has been created and is not part of any project. ' )
def _track_subtype ( self , init_values ) :
self . ensure_one ( )
mail_message_subtype_per_state = {
' 1_done ' : ' project.mt_task_done ' ,
' 1_canceled ' : ' project.mt_task_canceled ' ,
' 01_in_progress ' : ' project.mt_task_in_progress ' ,
' 03_approved ' : ' project.mt_task_approved ' ,
' 02_changes_requested ' : ' project.mt_task_changes_requested ' ,
' 04_waiting_normal ' : ' project.mt_task_waiting ' ,
if ' stage_id ' in init_values :
return self . env . ref ( ' project.mt_task_stage ' )
elif ' state ' in init_values and self . state in mail_message_subtype_per_state :
return self . env . ref ( mail_message_subtype_per_state [ self . state ] )
return super ( Task , self ) . _track_subtype ( init_values )
def _mail_get_message_subtypes ( self ) :
res = super ( ) . _mail_get_message_subtypes ( )
if not self . project_id . rating_active :
res - = self . env . ref ( ' project.mt_task_rating ' )
if len ( self ) == 1 :
waiting_subtype = self . env . ref ( ' project.mt_task_waiting ' )
if ( ( self . project_id and not self . project_id . allow_task_dependencies ) \
or ( not self . project_id and not self . env . user . has_group ( ' project.group_project_task_dependencies ' ) ) ) \
and waiting_subtype in res :
res - = waiting_subtype
return res
def _notify_get_recipients_groups ( self , message , model_description , msg_vals = None ) :
""" Handle project users and managers recipients that can assign
tasks and create new one directly from notification emails . Also give
access button to portal users and portal customers . If they are notified
they should probably have access to the document . """
groups = super ( ) . _notify_get_recipients_groups (
message , model_description , msg_vals = msg_vals
if not self :
return groups
self . ensure_one ( )
project_user_group_id = self . env . ref ( ' project.group_project_user ' ) . id
new_group = ( ' group_project_user ' , lambda pdata : pdata [ ' type ' ] == ' user ' and project_user_group_id in pdata [ ' groups ' ] , { } )
groups = [ new_group ] + groups
if self . project_privacy_visibility == ' portal ' :
groups . insert ( 0 , (
' allowed_portal_users ' ,
lambda pdata : pdata [ ' type ' ] == ' portal ' ,
' active ' : True ,
' has_button_access ' : True ,
) )
portal_privacy = self . project_id . privacy_visibility == ' portal '
for group_name , _group_method , group_data in groups :
if group_name in ( ' customer ' , ' user ' ) or group_name == ' portal_customer ' and not portal_privacy :
group_data [ ' has_button_access ' ] = False
elif group_name == ' portal_customer ' and portal_privacy :
group_data [ ' has_button_access ' ] = True
return groups
def _notify_get_reply_to ( self , default = None ) :
""" Override to set alias of tasks to their project if any. """
aliases = self . sudo ( ) . mapped ( ' project_id ' ) . _notify_get_reply_to ( default = default )
res = { task . id : aliases . get ( task . project_id . id ) for task in self }
leftover = self . filtered ( lambda rec : not rec . project_id )
if leftover :
res . update ( super ( Task , leftover ) . _notify_get_reply_to ( default = default ) )
return res
def _ensure_personal_stages ( self ) :
user = self . env . user
ProjectTaskTypeSudo = self . env [ ' project.task.type ' ] . sudo ( )
# In the case no stages have been found, we create the default stages for the user
if not ProjectTaskTypeSudo . search_count ( [ ( ' user_id ' , ' = ' , user . id ) ] , limit = 1 ) :
ProjectTaskTypeSudo . with_context ( lang = user . lang , default_project_id = False ) . create (
self . with_context ( lang = user . lang ) . _get_default_personal_stage_create_vals ( user . id )
def email_split ( self , msg ) :
email_list = tools . email_split ( ( msg . get ( ' to ' ) or ' ' ) + ' , ' + ( msg . get ( ' cc ' ) or ' ' ) )
# check left-part is not already an alias
aliases = self . mapped ( ' project_id.alias_name ' )
return [ x for x in email_list if x . split ( ' @ ' ) [ 0 ] not in aliases ]
def message_new ( self , msg , custom_values = None ) :
""" Overrides mail_thread message_new that is called by the mailgateway
through message_process .
This override updates the document according to the email .
# remove default author when going through the mail gateway. Indeed we
# do not want to explicitly set user_id to False; however we do not
# want the gateway user to be responsible if no other responsible is
# found.
create_context = dict ( self . env . context or { } )
create_context [ ' default_user_ids ' ] = False
create_context [ ' mail_notify_author ' ] = True # Allows sending stage updates to the author
if custom_values is None :
custom_values = { }
# Auto create partner if not existant when the task is created from email
if not msg . get ( ' author_id ' ) and msg . get ( ' email_from ' ) :
msg [ ' author_id ' ] = self . env [ ' res.partner ' ] . create ( {
' email ' : msg [ ' email_from ' ] ,
' name ' : msg [ ' email_from ' ] ,
} ) . id
defaults = {
' name ' : msg . get ( ' subject ' ) or _ ( " No Subject " ) ,
' allocated_hours ' : 0.0 ,
' partner_id ' : msg . get ( ' author_id ' ) ,
defaults . update ( custom_values )
task = super ( Task , self . with_context ( create_context ) ) . message_new ( msg , custom_values = defaults )
email_list = task . email_split ( msg )
partner_ids = [ p . id for p in self . env [ ' mail.thread ' ] . _mail_find_partner_from_emails ( email_list , records = task , force_create = False ) if p ]
task . message_subscribe ( partner_ids )
return task
def message_update ( self , msg , update_vals = None ) :
""" Override to update the task according to the email. """
email_list = self . email_split ( msg )
partner_ids = [ p . id for p in self . env [ ' mail.thread ' ] . _mail_find_partner_from_emails ( email_list , records = self , force_create = False ) if p ]
self . message_subscribe ( partner_ids )
return super ( Task , self ) . message_update ( msg , update_vals = update_vals )
def _message_get_suggested_recipients ( self ) :
recipients = super ( ) . _message_get_suggested_recipients ( )
if self . partner_id :
reason = _ ( ' Customer Email ' ) if self . partner_id . email else _ ( ' Customer ' )
self . _message_add_suggested_recipient ( recipients , partner = self . partner_id , reason = reason )
return recipients
def _notify_by_email_get_headers ( self , headers = None ) :
headers = super ( Task , self ) . _notify_by_email_get_headers ( headers = headers )
if self . project_id :
current_objects = [ h for h in headers . get ( ' X-Odoo-Objects ' , ' ' ) . split ( ' , ' ) if h ]
current_objects . insert ( 0 , ' project.project- %s , ' % self . project_id . id )
headers [ ' X-Odoo-Objects ' ] = ' , ' . join ( current_objects )
if self . tag_ids :
headers [ ' X-Odoo-Tags ' ] = ' , ' . join ( self . tag_ids . mapped ( ' name ' ) )
return headers
def _message_post_after_hook ( self , message , msg_vals ) :
if message . attachment_ids and not self . displayed_image_id :
image_attachments = message . attachment_ids . filtered ( lambda a : a . mimetype == ' image ' )
if image_attachments :
self . displayed_image_id = image_attachments [ 0 ]
# use the sanitized body of the email from the message thread to populate the task's description
if (
not self . description
and message . subtype_id == self . _creation_subtype ( )
and self . partner_id == message . author_id
and msg_vals [ ' message_type ' ] == ' email '
) :
self . description = message . body
return super ( Task , self ) . _message_post_after_hook ( message , msg_vals )
def _get_projects_to_make_billable_domain ( self , additional_domain = None ) :
return expression . AND ( [
[ ( ' partner_id ' , ' != ' , False ) ] ,
additional_domain or [ ] ,
] )
def _get_all_subtasks ( self ) :
return self . browse ( set . union ( set ( ) , * self . _get_subtask_ids_per_task_id ( ) . values ( ) ) )
def _get_subtask_ids_per_task_id ( self ) :
if not self :
return { }
res = dict . fromkeys ( self . _ids , [ ] )
if all ( self . _ids ) :
self . env . cr . execute (
AS (
SELECT id , id as supertask_id
FROM project_task
WHERE id IN % ( ancestor_ids ) s
SELECT t . id , tree . supertask_id
FROM project_task t
JOIN task_tree tree
ON tree . id = t . parent_id
AND t . active in ( TRUE , % ( active ) s )
WHERE t . parent_id IS NOT NULL
) SELECT supertask_id , ARRAY_AGG ( id )
FROM task_tree
WHERE id != supertask_id
GROUP BY supertask_id
""" ,
" ancestor_ids " : tuple ( self . ids ) ,
" active " : self . _context . get ( ' active_test ' , True ) ,
res . update ( dict ( self . env . cr . fetchall ( ) ) )
else :
res . update ( {
task . id : task . _get_subtasks_recursively ( ) . ids
for task in self
} )
return res
def _get_subtasks_recursively ( self ) :
children = self . child_ids
if not children :
return self . env [ ' project.task ' ]
return children + children . _get_subtasks_recursively ( )
def action_open_parent_task ( self ) :
return {
' name ' : _ ( ' Parent Task ' ) ,
' view_mode ' : ' form ' ,
' res_model ' : ' project.task ' ,
' res_id ' : self . parent_id . id ,
' type ' : ' ir.actions.act_window ' ,
' context ' : self . _context
def action_project_sharing_view_parent_task ( self ) :
if self . parent_id . project_id != self . project_id and self . env . user . _is_portal ( ) :
project = self . parent_id . project_id . _filtered_access ( ' read ' )
if project :
url = f " /my/projects/ { self . parent_id . project_id . id } /task/ { self . parent_id . id } "
if project . _check_project_sharing_access ( ) :
url = f " /my/projects/ { self . parent_id . project_id . id } ?task_id= { self . parent_id . id } "
return {
" name " : " Portal Parent Task " ,
" type " : " ir.actions.act_url " ,
" url " : url ,
elif self . display_parent_task_button :
return self . parent_id . get_portal_url ( )
# The portal user has no access to the parent task, so normally the button should be invisible.
return { }
action = self . with_context ( {
' search_view_ref ' : ' project.project_sharing_project_task_view_search ' ,
} ) . action_open_parent_task ( )
action [ ' views ' ] = [ ( self . env . ref ( ' project.project_sharing_project_task_view_form ' ) . id , ' form ' ) ]
action [ ' search_view_id ' ] = self . env . ref ( " project.project_sharing_project_task_view_search " ) . id
return action
# ------------
# Actions
# ------------
def action_open_task ( self ) :
return {
' view_mode ' : ' form ' ,
' res_model ' : ' project.task ' ,
' res_id ' : self . id ,
' type ' : ' ir.actions.act_window ' ,
' context ' : self . _context
def action_project_sharing_open_task ( self ) :
action = self . action_open_task ( )
action [ ' views ' ] = [ [ self . env . ref ( ' project.project_sharing_project_task_view_form ' ) . id , ' form ' ] ]
return action
def action_project_sharing_open_subtasks ( self ) :
self . ensure_one ( )
subtasks = self . env [ ' project.task ' ] . search ( [ ( ' id ' , ' child_of ' , self . id ) , ( ' id ' , ' != ' , self . id ) ] )
if subtasks . project_id == self . project_id :
action = self . env [ ' ir.actions.act_window ' ] . _for_xml_id ( ' project.project_sharing_project_task_action_sub_task ' )
if len ( subtasks ) == 1 :
action [ ' view_mode ' ] = ' form '
action [ ' views ' ] = [ ( view_id , view_type ) for view_id , view_type in action [ ' views ' ] if view_type == ' form ' ]
action [ ' res_id ' ] = subtasks . id
return action
return {
' name ' : ' Portal Sub-tasks ' ,
' type ' : ' ir.actions.act_url ' ,
' url ' : f ' /my/projects/ { self . project_id . id } /task/ { self . id } /subtasks ' if len ( subtasks ) > 1 else subtasks . get_portal_url ( query_string = ' project_sharing=1 ' ) ,
def action_project_sharing_open_blocking ( self ) :
self . ensure_one ( )
blockings = self . dependent_ids
action = self . env [ ' ir.actions.act_window ' ] . _for_xml_id ( ' project.project_sharing_project_task_action_blocking_tasks ' )
if len ( blockings ) == 1 :
action [ ' view_mode ' ] = ' form '
action [ ' views ' ] = [ ( view_id , view_type ) for view_id , view_type in action [ ' views ' ] if view_type == ' form ' ]
action [ ' res_id ' ] = blockings . id
return action
def action_dependent_tasks ( self ) :
self . ensure_one ( )
return {
' res_model ' : ' project.task ' ,
' type ' : ' ir.actions.act_window ' ,
' context ' : { * * self . _context , ' default_depend_on_ids ' : [ Command . link ( self . id ) ] , ' show_project_update ' : False , ' search_default_open_tasks ' : True } ,
' domain ' : [ ( ' depend_on_ids ' , ' = ' , self . id ) ] ,
' name ' : _ ( ' Dependent Tasks ' ) ,
' view_mode ' : ' list,form,kanban,calendar,pivot,graph,activity ' ,
def action_recurring_tasks ( self ) :
return {
' name ' : _ ( ' Tasks in Recurrence ' ) ,
' type ' : ' ir.actions.act_window ' ,
' res_model ' : ' project.task ' ,
' view_mode ' : ' list,form,kanban,calendar,pivot,graph,activity ' ,
' context ' : { ' create ' : False } ,
' domain ' : [ ( ' recurrence_id ' , ' in ' , self . recurrence_id . ids ) ] ,
def action_project_sharing_recurring_tasks ( self ) :
self . ensure_one ( )
recurrent_tasks = self . env [ ' project.task ' ] . search ( [ ( ' recurrence_id ' , ' in ' , self . recurrence_id . ids ) ] )
# If all the recurrent tasks are in the same project, open the list view in sharing mode.
if recurrent_tasks . project_id == self . project_id :
action = self . env [ ' ir.actions.act_window ' ] . _for_xml_id ( ' project.project_sharing_project_task_recurring_tasks_action ' )
action . update ( {
' context ' : { ' default_project_id ' : self . project_id . id } ,
' domain ' : [
( ' project_id ' , ' = ' , self . project_id . id ) ,
( ' recurrence_id ' , ' in ' , self . recurrence_id . ids )
} )
return action
# If at least one recurrent task belong to another project, open the portal page
return {
' name ' : ' Portal Recurrent Tasks ' ,
' type ' : ' ir.actions.act_url ' ,
' url ' : f ' /my/projects/ { self . project_id . id } /task/ { self . id } /recurrent_tasks ' ,
def action_open_ratings ( self ) :
self . ensure_one ( )
action = self . env [ ' ir.actions.act_window ' ] . _for_xml_id ( ' project.rating_rating_action_task ' )
if self . rating_count == 1 :
action [ ' view_mode ' ] = ' form '
action [ ' res_id ' ] = self . rating_ids [ 0 ] . id
action [ ' views ' ] = [ [ self . env . ref ( ' project.rating_rating_view_form_project ' ) . id , ' form ' ] ]
return action
else :
return action
def action_unlink_recurrence ( self ) :
self . recurrence_id . task_ids . recurring_task = False
self . recurrence_id . unlink ( )
def action_convert_to_subtask ( self ) :
self . ensure_one ( )
if self . project_id :
return {
' name ' : _ ( ' Convert to Task/Sub-Task ' ) ,
' type ' : ' ir.actions.act_window ' ,
' res_model ' : ' project.task ' ,
' res_id ' : self . id ,
' views ' : [ ( self . env . ref ( ' project.project_task_convert_to_subtask_view_form ' , False ) . id , ' form ' ) ] ,
' target ' : ' new ' ,
return {
' type ' : ' ir.actions.client ' ,
' tag ' : ' display_notification ' ,
' params ' : {
' type ' : ' danger ' ,
' message ' : _ ( ' Private tasks cannot be converted into sub-tasks. Please set a project on the task to gain access to this feature. ' ) ,
def action_archive ( self ) :
child_tasks = self . child_ids . filtered ( lambda child_task : not child_task . display_in_project )
if child_tasks :
child_tasks . action_archive ( )
self . filtered ( lambda t : not t . display_in_project and t . parent_id ) . display_in_project = True
return super ( ) . action_archive ( )
# ---------------------------------------------------
# Rating business
# ---------------------------------------------------
def _send_task_rating_mail ( self , force_send = False ) :
for task in self :
rating_template = task . stage_id . rating_template_id
partner = task . partner_id
if rating_template and partner and partner != self . env . user . partner_id :
task . rating_send_request ( rating_template , lang = task . partner_id . lang , force_send = force_send )
def _rating_get_partner ( self ) :
res = super ( Task , self ) . _rating_get_partner ( )
if not res and self . project_id . partner_id :
return self . project_id . partner_id
return res
def rating_apply ( self , rate , token = None , rating = None , feedback = None ,
subtype_xmlid = None , notify_delay_send = False ) :
rating = super ( Task , self ) . rating_apply (
rate , token = token , rating = rating , feedback = feedback ,
subtype_xmlid = subtype_xmlid , notify_delay_send = notify_delay_send )
if self . stage_id and self . stage_id . auto_validation_state :
state = ' 03_approved ' if rating . rating > = rating_data . RATING_LIMIT_SATISFIED else ' 02_changes_requested '
self . write ( { ' state ' : state } )
return rating
def _rating_apply_get_default_subtype_id ( self ) :
return self . env [ ' ir.model.data ' ] . _xmlid_to_res_id ( " project.mt_task_rating " )
def _rating_get_parent_field_name ( self ) :
return ' project_id '
def _rating_get_operator ( self ) :
""" Overwrite since we have user_ids and not user_id """
tasks_with_one_user = self . filtered ( lambda task : len ( task . user_ids ) == 1 and task . user_ids . partner_id )
return tasks_with_one_user . user_ids . partner_id or self . env [ ' res.partner ' ]
# ---------------------------------------------------
# Privacy
# ---------------------------------------------------
def _unsubscribe_portal_users ( self ) :
self . message_unsubscribe ( partner_ids = self . message_partner_ids . filtered ( ' user_ids.share ' ) . ids )
def get_unusual_days ( self , date_from , date_to = None ) :
calendar = self . env . company . resource_calendar_id
return calendar . _get_unusual_days (
datetime . combine ( fields . Date . from_string ( date_from ) , time . min ) . replace ( tzinfo = UTC ) ,
datetime . combine ( fields . Date . from_string ( date_to ) , time . max ) . replace ( tzinfo = UTC )
def action_redirect_to_project_task_form ( self ) :
menu_id = self . env . ref ( ' project.menu_project_management_all_tasks ' ) . id
return {
' type ' : ' ir.actions.act_url ' ,
' url ' : f " /odoo/1/action-project.act_project_project_2_project_task_all/ { self . id } ?menu_id= { menu_id } " ,
' target ' : ' new ' ,
def read_group ( self , domain , fields , groupby , offset = 0 , limit = None , orderby = False , lazy = True ) :
# A read_group can not be performed if records are grouped by personal_stage_type_id as it is a computed field.
# personal_stage_type_ids behaves like a M2O from the point of view of the user, we therefore use this field instead.
if ' personal_stage_type_id ' in groupby and ( not lazy or groupby [ 0 ] == ' personal_stage_type_id ' ) :
groupby = [ " personal_stage_type_ids " if field == " personal_stage_type_id " else field for field in groupby ] # limitation: problem when both personal_stage_type_id and personal_stage_type_ids appear in read_group, but this has no functional utility
result = super ( ) . read_group ( domain , fields , groupby , offset , limit , orderby , lazy )
for group in result :
group [ ' personal_stage_type_id ' ] = group . pop ( ' personal_stage_type_ids ' , False )
group [ ' personal_stage_type_id_count ' ] = group . pop ( ' personal_stage_type_ids_count ' , 0 )
return result
return super ( ) . read_group ( domain , fields , groupby , offset , limit , orderby , lazy )
# ---------------------------------------------------
# Project Sharing
# ---------------------------------------------------
def project_sharing_toggle_is_follower ( self ) :
self . ensure_one ( )
self . check_access ( ' write ' )
is_follower = self . message_is_follower
if is_follower :
self . sudo ( ) . message_unsubscribe ( self . env . user . partner_id . ids )
else :
self . sudo ( ) . message_subscribe ( self . env . user . partner_id . ids )
return not is_follower
@api.depends ( ' subtask_count ' , ' closed_subtask_count ' )
def _compute_subtask_completion_percentage ( self ) :
for task in self :
task . subtask_completion_percentage = task . subtask_count and task . closed_subtask_count / task . subtask_count
def _get_thread_with_access ( self , thread_id , mode = " read " , * * kwargs ) :
if project_sharing_id := kwargs . get ( " project_sharing_id " ) :
if token := ProjectSharingChatter . _check_project_access_and_get_token (
self , project_sharing_id , self . _name , thread_id , kwargs . get ( " token " )
) :
kwargs [ " token " ] = token
return super ( ) . _get_thread_with_access ( thread_id , mode , * * kwargs )
def get_mention_suggestions ( self , search , limit = 8 ) :
""" Return the ' limit ' -first followers of the given task or followers of its project matching
a ' search ' string as a list of partner data ( returned by ` _to_store ( ) ` ) .
See similar method for all partners ` get_mention_suggestions ( ) ` .
self . ensure_one ( )
project = self . project_id
if not (
and project . _check_project_sharing_access ( )
and project . _get_thread_with_access ( project . id )
) :
return { }
# sudo: mail.followers - reading message_follower_ids on accessible task/project is allowed
followers = project . sudo ( ) . message_follower_ids | self . sudo ( ) . message_follower_ids
domain = expression . AND ( [
self . env [ " res.partner " ] . _get_mention_suggestions_domain ( search ) ,
[ ( " id " , " in " , followers . partner_id . ids ) ] ,
] )
partners = self . env [ " res.partner " ] . sudo ( ) . _search_mention_suggestions ( domain , limit )
return Store ( partners ) . get_result ( )