Odoo18-Base/addons/hr_recruitment/models/hr_recruitment.py
2025-03-10 11:12:23 +07:00

780 lines
37 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
from random import randint
from odoo import api, fields, models, tools, SUPERUSER_ID
from odoo.exceptions import AccessError, UserError
from odoo.tools import Query
from odoo.tools.translate import _
from dateutil.relativedelta import relativedelta
from lxml import etree
AVAILABLE_PRIORITIES = [
('0', 'Normal'),
('1', 'Good'),
('2', 'Very Good'),
('3', 'Excellent')
]
class RecruitmentSource(models.Model):
_name = "hr.recruitment.source"
_description = "Source of Applicants"
_inherit = ['utm.source.mixin']
email = fields.Char(related='alias_id.display_name', string="Email", readonly=True)
has_domain = fields.Char(compute='_compute_has_domain')
job_id = fields.Many2one('hr.job', "Job", ondelete='cascade')
alias_id = fields.Many2one('mail.alias', "Alias ID")
medium_id = fields.Many2one('utm.medium', default=lambda self: self.env.ref('utm.utm_medium_website'))
def _compute_has_domain(self):
self.has_domain = bool(self.env["ir.config_parameter"].sudo().get_param("mail.catchall.domain"))
def create_alias(self):
campaign = self.env.ref('hr_recruitment.utm_campaign_job')
medium = self.env.ref('utm.utm_medium_email')
for source in self:
vals = {
'alias_parent_thread_id': source.job_id.id,
'alias_model_id': self.env['ir.model']._get('hr.applicant').id,
'alias_parent_model_id': self.env['ir.model']._get('hr.job').id,
'alias_name': "%s+%s" % (source.job_id.alias_name or source.job_id.name, source.name),
'alias_defaults': {
'job_id': source.job_id.id,
'campaign_id': campaign.id,
'medium_id': medium.id,
'source_id': source.source_id.id,
},
}
# check that you can create source before to call mail.alias in sudo with known/controlled vals
source.check_access_rights('create')
source.check_access_rule('create')
source.alias_id = self.env['mail.alias'].sudo().create(vals)
@api.model
def _get_view(self, view_id=None, view_type='form', **options):
arch, view = super()._get_view(view_id, view_type, **options)
if view_type == 'tree' and not bool(self.env["ir.config_parameter"].sudo().get_param("mail.catchall.domain")):
email = arch.xpath("//field[@name='email']")[0]
email.getparent().remove(email)
return arch, view
class RecruitmentStage(models.Model):
_name = "hr.recruitment.stage"
_description = "Recruitment Stages"
_order = 'sequence'
name = fields.Char("Stage Name", required=True, translate=True)
sequence = fields.Integer(
"Sequence", default=10)
job_ids = fields.Many2many(
'hr.job', string='Job Specific',
help='Specific jobs that uses this stage. Other jobs will not use this stage.')
requirements = fields.Text("Requirements")
template_id = fields.Many2one(
'mail.template', "Email Template",
help="If set, a message is posted on the applicant using the template when the applicant is set to the stage.")
fold = fields.Boolean(
"Folded in Kanban",
help="This stage is folded in the kanban view when there are no records in that stage to display.")
hired_stage = fields.Boolean('Hired Stage',
help="If checked, this stage is used to determine the hire date of an applicant")
legend_blocked = fields.Char(
'Red Kanban Label', default=lambda self: _('Blocked'), translate=True, required=True)
legend_done = fields.Char(
'Green Kanban Label', default=lambda self: _('Ready for Next Stage'), translate=True, required=True)
legend_normal = fields.Char(
'Grey Kanban Label', default=lambda self: _('In Progress'), translate=True, required=True)
is_warning_visible = fields.Boolean(compute='_compute_is_warning_visible')
@api.model
def default_get(self, fields):
if self._context and self._context.get('default_job_id') and not self._context.get('hr_recruitment_stage_mono', False):
context = dict(self._context)
context.pop('default_job_id')
self = self.with_context(context)
return super(RecruitmentStage, self).default_get(fields)
@api.depends('hired_stage')
def _compute_is_warning_visible(self):
applicant_data = self.env['hr.applicant']._read_group([('stage_id', 'in', self.ids)], ['stage_id'], 'stage_id')
applicants = dict((data['stage_id'][0], data['stage_id_count']) for data in applicant_data)
for stage in self:
if stage._origin.hired_stage and not stage.hired_stage and applicants.get(stage._origin.id):
stage.is_warning_visible = True
else:
stage.is_warning_visible = False
class RecruitmentDegree(models.Model):
_name = "hr.recruitment.degree"
_description = "Applicant Degree"
_sql_constraints = [
('name_uniq', 'unique (name)', 'The name of the Degree of Recruitment must be unique!')
]
name = fields.Char("Degree Name", required=True, translate=True)
sequence = fields.Integer("Sequence", default=1)
class Applicant(models.Model):
_name = "hr.applicant"
_description = "Applicant"
_order = "priority desc, id desc"
_inherit = ['mail.thread.cc', 'mail.activity.mixin', 'utm.mixin']
_mailing_enabled = True
name = fields.Char("Subject / Application", required=True, help="Email subject for applications sent via email", index='trigram')
active = fields.Boolean("Active", default=True, help="If the active field is set to false, it will allow you to hide the case without removing it.")
description = fields.Html("Description")
email_from = fields.Char("Email", size=128, compute='_compute_partner_phone_email',
inverse='_inverse_partner_email', store=True)
probability = fields.Float("Probability")
partner_id = fields.Many2one('res.partner', "Contact", copy=False)
create_date = fields.Datetime("Creation Date", readonly=True)
stage_id = fields.Many2one('hr.recruitment.stage', 'Stage', ondelete='restrict', tracking=True,
compute='_compute_stage', store=True, readonly=False,
domain="['|', ('job_ids', '=', False), ('job_ids', '=', job_id)]",
copy=False, index=True,
group_expand='_read_group_stage_ids')
last_stage_id = fields.Many2one('hr.recruitment.stage', "Last Stage",
help="Stage of the applicant before being in the current stage. Used for lost cases analysis.")
categ_ids = fields.Many2many('hr.applicant.category', string="Tags")
company_id = fields.Many2one('res.company', "Company", compute='_compute_company', store=True, readonly=False, tracking=True)
user_id = fields.Many2one(
'res.users', "Recruiter", compute='_compute_user', domain="[('share', '=', False), ('company_ids', 'in', company_id)]",
tracking=True, store=True, readonly=False)
date_closed = fields.Datetime("Hire Date", compute='_compute_date_closed', store=True, readonly=False, tracking=True, copy=False)
date_open = fields.Datetime("Assigned", readonly=True)
date_last_stage_update = fields.Datetime("Last Stage Update", index=True, default=fields.Datetime.now)
priority = fields.Selection(AVAILABLE_PRIORITIES, "Evaluation", default='0')
job_id = fields.Many2one('hr.job', "Applied Job", domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]", tracking=True, index=True)
salary_proposed_extra = fields.Char("Proposed Salary Extra", help="Salary Proposed by the Organisation, extra advantages", tracking=True, groups="hr_recruitment.group_hr_recruitment_user")
salary_expected_extra = fields.Char("Expected Salary Extra", help="Salary Expected by Applicant, extra advantages", tracking=True, groups="hr_recruitment.group_hr_recruitment_user")
salary_proposed = fields.Float("Proposed Salary", group_operator="avg", help="Salary Proposed by the Organisation", tracking=True, groups="hr_recruitment.group_hr_recruitment_user")
salary_expected = fields.Float("Expected Salary", group_operator="avg", help="Salary Expected by Applicant", tracking=True, groups="hr_recruitment.group_hr_recruitment_user")
availability = fields.Date("Availability", help="The date at which the applicant will be available to start working", tracking=True)
partner_name = fields.Char("Applicant's Name")
partner_phone = fields.Char("Phone", size=32, compute='_compute_partner_phone_email',
inverse='_inverse_partner_phone', store=True)
partner_mobile = fields.Char("Mobile", size=32, compute='_compute_partner_phone_email',
inverse='_inverse_partner_mobile', store=True)
type_id = fields.Many2one('hr.recruitment.degree', "Degree")
department_id = fields.Many2one(
'hr.department', "Department", compute='_compute_department', store=True, readonly=False,
domain="['|', ('company_id', '=', False), ('company_id', '=', company_id)]", tracking=True)
day_open = fields.Float(compute='_compute_day', string="Days to Open", compute_sudo=True)
day_close = fields.Float(compute='_compute_day', string="Days to Close", compute_sudo=True)
delay_close = fields.Float(compute="_compute_day", string='Delay to Close', readonly=True, group_operator="avg", help="Number of days to close", store=True)
color = fields.Integer("Color Index", default=0)
emp_id = fields.Many2one('hr.employee', string="Employee", help="Employee linked to the applicant.", copy=False)
user_email = fields.Char(related='user_id.email', string="User Email", readonly=True)
attachment_number = fields.Integer(compute='_get_attachment_number', string="Number of Attachments")
employee_name = fields.Char(related='emp_id.name', string="Employee Name", readonly=False, tracking=False)
attachment_ids = fields.One2many('ir.attachment', 'res_id', domain=[('res_model', '=', 'hr.applicant')], string='Attachments')
kanban_state = fields.Selection([
('normal', 'Grey'),
('done', 'Green'),
('blocked', 'Red')], string='Kanban State',
copy=False, default='normal', required=True)
legend_blocked = fields.Char(related='stage_id.legend_blocked', string='Kanban Blocked')
legend_done = fields.Char(related='stage_id.legend_done', string='Kanban Valid')
legend_normal = fields.Char(related='stage_id.legend_normal', string='Kanban Ongoing')
application_count = fields.Integer(compute='_compute_application_count', help='Applications with the same email or phone or mobile')
refuse_reason_id = fields.Many2one('hr.applicant.refuse.reason', string='Refuse Reason', tracking=True)
meeting_ids = fields.One2many('calendar.event', 'applicant_id', 'Meetings')
meeting_display_text = fields.Char(compute='_compute_meeting_display')
meeting_display_date = fields.Date(compute='_compute_meeting_display')
# UTMs - enforcing the fact that we want to 'set null' when relation is unlinked
campaign_id = fields.Many2one(ondelete='set null')
medium_id = fields.Many2one(ondelete='set null')
source_id = fields.Many2one(ondelete='set null')
interviewer_ids = fields.Many2many('res.users', 'hr_applicant_res_users_interviewers_rel',
string='Interviewers', index=True, tracking=True,
domain="[('share', '=', False), ('company_ids', 'in', company_id)]")
linkedin_profile = fields.Char('LinkedIn Profile')
application_status = fields.Selection([
('ongoing', 'Ongoing'),
('hired', 'Hired'),
('refused', 'Refused'),
], compute="_compute_application_status")
@api.onchange('job_id')
def _onchange_job_id(self):
for applicant in self:
if applicant.job_id.name:
applicant.name = applicant.job_id.name
@api.depends('date_open', 'date_closed')
def _compute_day(self):
for applicant in self:
if applicant.date_open:
date_create = applicant.create_date
date_open = applicant.date_open
applicant.day_open = (date_open - date_create).total_seconds() / (24.0 * 3600)
else:
applicant.day_open = False
if applicant.date_closed:
date_create = applicant.create_date
date_closed = applicant.date_closed
applicant.day_close = (date_closed - date_create).total_seconds() / (24.0 * 3600)
applicant.delay_close = applicant.day_close - applicant.day_open
else:
applicant.day_close = False
applicant.delay_close = False
@api.depends('email_from', 'partner_phone', 'partner_mobile')
def _compute_application_count(self):
self.flush_model(['email_from'])
applicants = self.env['hr.applicant']
for applicant in self:
if applicant.email_from or applicant.partner_phone or applicant.partner_mobile:
applicants |= applicant
# Done via SQL since read_group does not support grouping by lowercase field
if applicants.ids:
query = Query(self.env.cr, self._table, self._table_query)
query.add_where('hr_applicant.id in %s', [tuple(applicants.ids)])
# Count into the companies that are selected from the multi-company widget
company_ids = self.env.context.get('allowed_company_ids')
if company_ids:
query.add_where('other.company_id in %s', [tuple(company_ids)])
self._apply_ir_rules(query)
from_clause, where_clause, where_clause_params = query.get_sql()
# In case the applicant phone or mobile is configured in wrong field
query_str = """
SELECT hr_applicant.id as appl_id,
COUNT(other.id) as count
FROM hr_applicant
JOIN hr_applicant other ON LOWER(other.email_from) = LOWER(hr_applicant.email_from)
OR other.partner_phone = hr_applicant.partner_phone OR other.partner_phone = hr_applicant.partner_mobile
OR other.partner_mobile = hr_applicant.partner_mobile OR other.partner_mobile = hr_applicant.partner_phone
%(where)s
GROUP BY hr_applicant.id
""" % {
'where': ('WHERE %s' % where_clause) if where_clause else '',
}
self.env.cr.execute(query_str, where_clause_params)
application_data_mapped = dict((data['appl_id'], data['count']) for data in self.env.cr.dictfetchall())
else:
application_data_mapped = dict()
for applicant in applicants:
applicant.application_count = application_data_mapped.get(applicant.id, 1) - 1
(self - applicants).application_count = False
@api.depends_context('lang')
@api.depends('meeting_ids', 'meeting_ids.start')
def _compute_meeting_display(self):
applicant_with_meetings = self.filtered('meeting_ids')
(self - applicant_with_meetings).update({
'meeting_display_text': _('No Meeting'),
'meeting_display_date': ''
})
today = fields.Date.today()
for applicant in applicant_with_meetings:
count = len(applicant.meeting_ids)
dates = applicant.meeting_ids.mapped('start')
min_date, max_date = min(dates).date(), max(dates).date()
if min_date >= today:
applicant.meeting_display_date = min_date
else:
applicant.meeting_display_date = max_date
if count == 1:
applicant.meeting_display_text = _('1 Meeting')
elif applicant.meeting_display_date >= today:
applicant.meeting_display_text = _('Next Meeting')
else:
applicant.meeting_display_text = _('Last Meeting')
@api.depends('refuse_reason_id', 'date_closed')
def _compute_application_status(self):
for applicant in self:
if applicant.refuse_reason_id:
applicant.application_status = 'refused'
elif applicant.date_closed:
applicant.application_status = 'hired'
else:
applicant.application_status = 'ongoing'
def _get_attachment_number(self):
read_group_res = self.env['ir.attachment']._read_group(
[('res_model', '=', 'hr.applicant'), ('res_id', 'in', self.ids)],
['res_id'], ['res_id'])
attach_data = dict((res['res_id'], res['res_id_count']) for res in read_group_res)
for record in self:
record.attachment_number = attach_data.get(record.id, 0)
@api.model
def _read_group_stage_ids(self, stages, domain, order):
# retrieve job_id from the context and write the domain: ids + contextual columns (job or default)
job_id = self._context.get('default_job_id')
search_domain = [('job_ids', '=', False)]
if job_id:
search_domain = ['|', ('job_ids', '=', job_id)] + search_domain
if stages:
search_domain = ['|', ('id', 'in', stages.ids)] + search_domain
stage_ids = stages._search(search_domain, order=order, access_rights_uid=SUPERUSER_ID)
return stages.browse(stage_ids)
@api.depends('job_id', 'department_id')
def _compute_company(self):
for applicant in self:
company_id = False
if applicant.department_id:
company_id = applicant.department_id.company_id.id
if not company_id and applicant.job_id:
company_id = applicant.job_id.company_id.id
applicant.company_id = company_id or self.env.company.id
@api.depends('job_id')
def _compute_department(self):
for applicant in self:
applicant.department_id = applicant.job_id.department_id.id
@api.depends('job_id')
def _compute_stage(self):
for applicant in self:
if applicant.job_id:
if not applicant.stage_id:
stage_ids = self.env['hr.recruitment.stage'].search([
'|',
('job_ids', '=', False),
('job_ids', '=', applicant.job_id.id),
('fold', '=', False)
], order='sequence asc', limit=1).ids
applicant.stage_id = stage_ids[0] if stage_ids else False
else:
applicant.stage_id = False
@api.depends('job_id')
def _compute_user(self):
for applicant in self:
applicant.user_id = applicant.job_id.user_id.id or self.env.uid
@api.depends('partner_id')
def _compute_partner_phone_email(self):
for applicant in self:
if applicant.partner_id:
applicant.partner_phone = applicant.partner_id.phone
applicant.partner_mobile = applicant.partner_id.mobile
applicant.email_from = applicant.partner_id.email
def _inverse_partner_email(self):
for applicant in self.filtered(lambda a: a.partner_id and a.email_from and not a.partner_id.email):
applicant.partner_id.email = applicant.email_from
def _inverse_partner_phone(self):
for applicant in self.filtered(lambda a: a.partner_id and a.partner_phone and not a.partner_id.phone):
applicant.partner_id.phone = applicant.partner_phone
def _inverse_partner_mobile(self):
for applicant in self.filtered(lambda a: a.partner_id and a.partner_mobile and not a.partner_id.mobile):
applicant.partner_id.mobile = applicant.partner_mobile
@api.depends('stage_id.hired_stage')
def _compute_date_closed(self):
for applicant in self:
if applicant.stage_id and applicant.stage_id.hired_stage and not applicant.date_closed:
applicant.date_closed = fields.datetime.now()
if not applicant.stage_id.hired_stage:
applicant.date_closed = False
def _check_interviewer_access(self):
if self.user_has_groups('hr_recruitment.group_hr_recruitment_interviewer'):
raise AccessError(_('You are not allowed to perform this action.'))
@api.model_create_multi
def create(self, vals_list):
for vals in vals_list:
if vals.get('user_id'):
vals['date_open'] = fields.Datetime.now()
if vals.get('email_from'):
vals['email_from'] = vals['email_from'].strip()
applicants = super().create(vals_list)
applicants.sudo().interviewer_ids._create_recruitment_interviewers()
# Record creation through calendar, creates the calendar event directly, it will also create the activity.
if 'default_activity_date_deadline' in self.env.context:
deadline = fields.Datetime.to_datetime(self.env.context.get('default_activity_date_deadline'))
category = self.env.ref('hr_recruitment.categ_meet_interview')
for applicant in applicants:
partners = applicant.partner_id | applicant.user_id.partner_id | applicant.department_id.manager_id.user_id.partner_id
self.env['calendar.event'].sudo().with_context(default_applicant_id=applicant.id).create({
'applicant_id': applicant.id,
'partner_ids': [(6, 0, partners.ids)],
'user_id': self.env.uid,
'name': applicant.name,
'categ_ids': [category.id],
'start': deadline,
'stop': deadline + relativedelta(minutes=30),
})
return applicants
def write(self, vals):
# user_id change: update date_open
if vals.get('user_id'):
vals['date_open'] = fields.Datetime.now()
if vals.get('email_from'):
vals['email_from'] = vals['email_from'].strip()
old_interviewers = self.interviewer_ids
# stage_id: track last stage before update
if 'stage_id' in vals:
vals['date_last_stage_update'] = fields.Datetime.now()
if 'kanban_state' not in vals:
vals['kanban_state'] = 'normal'
for applicant in self:
vals['last_stage_id'] = applicant.stage_id.id
res = super(Applicant, self).write(vals)
else:
res = super(Applicant, self).write(vals)
if 'interviewer_ids' in vals:
interviewers_to_clean = old_interviewers - self.interviewer_ids
interviewers_to_clean._remove_recruitment_interviewers()
self.sudo().interviewer_ids._create_recruitment_interviewers()
if vals.get('emp_id'):
self._update_employee_from_applicant()
return res
def get_empty_list_help(self, help):
if 'active_id' in self.env.context and self.env.context.get('active_model') == 'hr.job':
alias_id = self.env['hr.job'].browse(self.env.context['active_id']).alias_id
else:
alias_id = False
nocontent_values = {
'help_title': _("No application found. Let's create one !"),
'para_1': _('People can also apply by email to save time.'),
'para_2': _("You can search into attachment's content, like resumes, with the searchbar."),
}
nocontent_body = """
<p class="o_view_nocontent_smiling_face">%(help_title)s</p>
<p>%(para_1)s<br/>%(para_2)s</p>"""
if alias_id and alias_id.alias_domain and alias_id.alias_name:
email = alias_id.display_name
email_link = "<a href='mailto:%s'>%s</a>" % (email, email)
nocontent_values['email_link'] = email_link
nocontent_body += """<p class="o_copy_paste_email">%(email_link)s</p>"""
return nocontent_body % nocontent_values
@api.model
def get_view(self, view_id=None, view_type='form', **options):
if view_type == 'form' and self.user_has_groups('hr_recruitment.group_hr_recruitment_interviewer'):
view_id = self.env.ref('hr_recruitment.hr_applicant_view_form_interviewer').id
return super().get_view(view_id, view_type, **options)
def _notify_get_recipients(self, message, msg_vals, **kwargs):
"""
Do not notify members of the Recruitment Interviewer group, as this
might leak some data they shouldn't have access to.
"""
recipients = super()._notify_get_recipients(message, msg_vals, **kwargs)
interviewer_group = self.env.ref('hr_recruitment.group_hr_recruitment_interviewer').id
return [recipient for recipient in recipients if interviewer_group not in recipient['groups']]
def action_makeMeeting(self):
""" This opens Meeting's calendar view to schedule meeting on current applicant
@return: Dictionary value for created Meeting view
"""
self.ensure_one()
if not self.partner_id:
if not self.partner_name:
raise UserError(_('You must define a Contact Name for this applicant.'))
self.partner_id = self.env['res.partner'].create({
'is_company': False,
'type': 'private',
'name': self.partner_name,
'email': self.email_from,
'phone': self.partner_phone,
'mobile': self.partner_mobile
})
partners = self.partner_id | self.user_id.partner_id | self.department_id.manager_id.user_id.partner_id
category = self.env.ref('hr_recruitment.categ_meet_interview')
res = self.env['ir.actions.act_window']._for_xml_id('calendar.action_calendar_event')
res['context'] = {
'default_applicant_id': self.id,
'default_partner_ids': partners.ids,
'default_user_id': self.env.uid,
'default_name': self.name,
'default_categ_ids': category and [category.id] or False,
}
return res
def action_open_attachments(self):
return {
'type': 'ir.actions.act_window',
'res_model': 'ir.attachment',
'name': _('Documents'),
'context': {
'default_res_model': 'hr.applicant',
'default_res_id': self.ids[0],
'show_partner_name': 1,
},
'view_mode': 'tree,form',
'views': [
(self.env.ref('hr_recruitment.ir_attachment_hr_recruitment_list_view').id, 'tree'),
(False, 'form'),
],
'search_view_id': self.env.ref('hr_recruitment.ir_attachment_view_search_inherit_hr_recruitment').ids,
'domain': [('res_model', '=', 'hr.applicant'), ('res_id', 'in', self.ids), ],
}
def action_applications_email(self):
self.ensure_one()
self.env.cr.execute("""
SELECT other.id
FROM hr_applicant
JOIN hr_applicant other ON LOWER(other.email_from) = LOWER(hr_applicant.email_from)
OR other.partner_phone = hr_applicant.partner_phone OR other.partner_phone = hr_applicant.partner_mobile
OR other.partner_mobile = hr_applicant.partner_mobile OR other.partner_mobile = hr_applicant.partner_phone
WHERE hr_applicant.id in %s
""", (tuple(self.ids),)
)
ids = [res['id'] for res in self.env.cr.dictfetchall()]
return {
'type': 'ir.actions.act_window',
'name': _('Job Applications'),
'res_model': self._name,
'view_mode': 'tree,kanban,form,pivot,graph,calendar,activity',
'domain': [('id', 'in', ids)],
'context': {
'active_test': False,
'search_default_stage': 1,
},
}
def action_open_employee(self):
self.ensure_one()
return {
'name': _('Employee'),
'type': 'ir.actions.act_window',
'res_model': 'hr.employee',
'view_mode': 'form',
'res_id': self.emp_id.id,
}
def _track_template(self, changes):
res = super(Applicant, self)._track_template(changes)
applicant = self[0]
if 'stage_id' in changes and applicant.stage_id.template_id:
res['stage_id'] = (applicant.stage_id.template_id, {
'auto_delete_message': True,
'subtype_id': self.env['ir.model.data']._xmlid_to_res_id('mail.mt_note'),
'email_layout_xmlid': 'mail.mail_notification_light'
})
return res
def _creation_subtype(self):
return self.env.ref('hr_recruitment.mt_applicant_new')
def _track_subtype(self, init_values):
record = self[0]
if 'stage_id' in init_values and record.stage_id:
return self.env.ref('hr_recruitment.mt_applicant_stage_changed')
return super(Applicant, self)._track_subtype(init_values)
def _notify_get_reply_to(self, default=None):
""" Override to set alias of applicants to their job definition if any. """
aliases = self.mapped('job_id')._notify_get_reply_to(default=default)
res = {app.id: aliases.get(app.job_id.id) for app in self}
leftover = self.filtered(lambda rec: not rec.job_id)
if leftover:
res.update(super(Applicant, leftover)._notify_get_reply_to(default=default))
return res
def _message_get_suggested_recipients(self):
recipients = super(Applicant, self)._message_get_suggested_recipients()
for applicant in self:
if applicant.partner_id:
applicant._message_add_suggested_recipient(recipients, partner=applicant.partner_id.sudo(), reason=_('Contact'))
elif applicant.email_from:
email_from = tools.email_normalize(applicant.email_from)
if email_from and applicant.partner_name:
email_from = tools.formataddr((applicant.partner_name, email_from))
applicant._message_add_suggested_recipient(recipients, email=email_from, reason=_('Contact Email'))
return recipients
def name_get(self):
if self.env.context.get('show_partner_name'):
return [
(applicant.id, applicant.partner_name or applicant.name)
for applicant in self
]
return super().name_get()
@api.model
def message_new(self, msg, custom_values=None):
""" Overrides mail_thread message_new that is called by the mailgateway
through message_process.
This override updates the document according to the email.
"""
# remove default author when going through the mail gateway. Indeed we
# do not want to explicitly set user_id to False; however we do not
# want the gateway user to be responsible if no other responsible is
# found.
self = self.with_context(default_user_id=False)
stage = False
if custom_values and 'job_id' in custom_values:
stage = self.env['hr.job'].browse(custom_values['job_id'])._get_first_stage()
partner_name, email_from = self.env['res.partner']._parse_partner_name(msg.get('from'))
defaults = {
'name': msg.get('subject') or _("No Subject"),
'partner_name': partner_name or email_from,
'email_from': email_from,
'partner_id': msg.get('author_id', False),
}
if msg.get('priority'):
defaults['priority'] = msg.get('priority')
if stage and stage.id:
defaults['stage_id'] = stage.id
if custom_values:
defaults.update(custom_values)
return super(Applicant, self).message_new(msg, custom_values=defaults)
def _message_post_after_hook(self, message, msg_vals):
if self.email_from and not self.partner_id:
# we consider that posting a message with a specified recipient (not a follower, a specific one)
# on a document without customer means that it was created through the chatter using
# suggested recipients. This heuristic allows to avoid ugly hacks in JS.
email_normalized = tools.email_normalize(self.email_from)
new_partner = message.partner_ids.filtered(
lambda partner: partner.email == self.email_from or (email_normalized and partner.email_normalized == email_normalized)
)
if new_partner:
if new_partner[0].create_date.date() == fields.Date.today():
new_partner[0].write({
'type': 'private',
'name': self.partner_name or self.email_from,
'phone': self.partner_phone,
'mobile': self.partner_mobile,
})
if new_partner[0].email_normalized:
email_domain = ('email_from', 'in', [new_partner[0].email, new_partner[0].email_normalized])
else:
email_domain = ('email_from', '=', new_partner[0].email)
self.search([
('partner_id', '=', False), email_domain, ('stage_id.fold', '=', False)
]).write({'partner_id': new_partner[0].id})
return super(Applicant, self)._message_post_after_hook(message, msg_vals)
def create_employee_from_applicant(self):
""" Create an employee from applicant """
self.ensure_one()
self._check_interviewer_access()
contact_name = False
if self.partner_id:
address_id = self.partner_id.address_get(['contact'])['contact']
contact_name = self.partner_id.display_name
else:
if not self.partner_name:
raise UserError(_('You must define a Contact Name for this applicant.'))
new_partner_id = self.env['res.partner'].create({
'is_company': False,
'type': 'private',
'name': self.partner_name,
'email': self.email_from,
'phone': self.partner_phone,
'mobile': self.partner_mobile
})
self.partner_id = new_partner_id
address_id = new_partner_id.address_get(['contact'])['contact']
employee_data = {
'default_name': self.partner_name or contact_name,
'default_job_id': self.job_id.id,
'default_job_title': self.job_id.name,
'default_address_home_id': address_id,
'default_department_id': self.department_id.id,
'default_address_id': self.company_id.partner_id.id,
'default_work_email': self.department_id.company_id.email or self.email_from, # To have a valid email address by default
'default_work_phone': self.department_id.company_id.phone,
'form_view_initial_mode': 'edit',
'default_applicant_id': self.ids,
}
dict_act_window = self.env['ir.actions.act_window']._for_xml_id('hr.open_view_employee_list')
dict_act_window['context'] = employee_data
return dict_act_window
def _update_employee_from_applicant(self):
# This method is to be overriden
return
def archive_applicant(self):
return {
'type': 'ir.actions.act_window',
'name': _('Refuse Reason'),
'res_model': 'applicant.get.refuse.reason',
'view_mode': 'form',
'target': 'new',
'context': {'default_applicant_ids': self.ids, 'active_test': False},
'views': [[False, 'form']]
}
def reset_applicant(self):
""" Reinsert the applicant into the recruitment pipe in the first stage"""
default_stage = dict()
for job_id in self.mapped('job_id'):
default_stage[job_id.id] = self.env['hr.recruitment.stage'].search(
['|',
('job_ids', '=', False),
('job_ids', '=', job_id.id),
('fold', '=', False)
], order='sequence asc', limit=1).id
for applicant in self:
applicant.write(
{'stage_id': applicant.job_id.id and default_stage[applicant.job_id.id],
'refuse_reason_id': False})
def toggle_active(self):
res = super(Applicant, self).toggle_active()
applicant_active = self.filtered(lambda applicant: applicant.active)
if applicant_active:
applicant_active.reset_applicant()
applicant_inactive = self.filtered(lambda applicant: not applicant.active)
if applicant_inactive:
return applicant_inactive.archive_applicant()
return res
def action_send_email(self):
return {
'name': _('Send Email'),
'type': 'ir.actions.act_window',
'target': 'new',
'view_mode': 'form',
'res_model': 'applicant.send.mail',
'context': {
'default_applicant_ids': self.ids,
}
}
class ApplicantCategory(models.Model):
_name = "hr.applicant.category"
_description = "Category of applicant"
def _get_default_color(self):
return randint(1, 11)
name = fields.Char("Tag Name", required=True)
color = fields.Integer(string='Color Index', default=_get_default_color)
_sql_constraints = [
('name_uniq', 'unique (name)', "Tag name already exists !"),
]
class ApplicantRefuseReason(models.Model):
_name = "hr.applicant.refuse.reason"
_description = 'Refuse Reason of Applicant'
name = fields.Char('Description', required=True, translate=True)
template_id = fields.Many2one('mail.template', string='Email Template', domain="[('model', '=', 'hr.applicant')]")
active = fields.Boolean('Active', default=True)