324 lines
12 KiB
Python
324 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
|
|
from collections import defaultdict
|
|
|
|
from odoo import api, fields, models, tools, _
|
|
from odoo.exceptions import UserError
|
|
from odoo.tools import SQL
|
|
|
|
|
|
class PrivacyLookupWizard(models.TransientModel):
|
|
_name = 'privacy.lookup.wizard'
|
|
_description = 'Privacy Lookup Wizard'
|
|
_transient_max_count = 0
|
|
_transient_max_hours = 24
|
|
|
|
name = fields.Char(required=True)
|
|
email = fields.Char(required=True)
|
|
line_ids = fields.One2many('privacy.lookup.wizard.line', 'wizard_id')
|
|
execution_details = fields.Text(compute='_compute_execution_details', store=True)
|
|
log_id = fields.Many2one('privacy.log')
|
|
records_description = fields.Text(compute='_compute_records_description')
|
|
line_count = fields.Integer(compute='_compute_line_count')
|
|
|
|
@api.depends('line_ids')
|
|
def _compute_line_count(self):
|
|
for wizard in self:
|
|
wizard.line_count = len(wizard.line_ids)
|
|
|
|
def _compute_display_name(self):
|
|
self.display_name = _('Privacy Lookup')
|
|
|
|
def _get_query_models_blacklist(self):
|
|
return [
|
|
# Already Managed
|
|
'res.partner',
|
|
'res.users',
|
|
# Ondelete Cascade
|
|
'mail.notification',
|
|
'mail.followers',
|
|
'discuss.channel.member',
|
|
# Special case for direct messages
|
|
'mail.message',
|
|
]
|
|
|
|
def _get_query(self):
|
|
name = self.name.strip()
|
|
email = f"%{self.email.strip()}%"
|
|
email_normalized = tools.email_normalize(self.email.strip())
|
|
|
|
# Step 1: Retrieve users/partners liked to email address or name
|
|
query = SQL("""
|
|
WITH indirect_references AS (
|
|
SELECT id
|
|
FROM res_partner
|
|
WHERE email_normalized = %s
|
|
OR name ilike %s)
|
|
SELECT
|
|
%s AS res_model_id,
|
|
id AS res_id,
|
|
active AS is_active
|
|
FROM res_partner
|
|
WHERE id IN (SELECT id FROM indirect_references)
|
|
UNION ALL
|
|
SELECT
|
|
%s AS res_model_id,
|
|
id AS res_id,
|
|
active AS is_active
|
|
FROM res_users
|
|
WHERE (
|
|
(login ilike %s)
|
|
OR
|
|
(partner_id IN (
|
|
SELECT id
|
|
FROM res_partner
|
|
WHERE email ilike %s or name ilike %s)))
|
|
-- Step 2: Special case for direct messages
|
|
UNION ALL
|
|
SELECT
|
|
%s AS res_model_id,
|
|
id AS res_id,
|
|
True AS is_active
|
|
FROM mail_message
|
|
WHERE author_id IN (SELECT id FROM indirect_references)
|
|
""",
|
|
# Indirect references CTE
|
|
email_normalized, name,
|
|
# Search on res.partner
|
|
self.env['ir.model.data']._xmlid_to_res_id('base.model_res_partner'),
|
|
# Search on res.users
|
|
self.env['ir.model.data']._xmlid_to_res_id('base.model_res_users'), email, email, name,
|
|
# Direct messages
|
|
self.env['ir.model.data']._xmlid_to_res_id('mail.model_mail_message'),
|
|
)
|
|
|
|
# Step 3: Retrieve info on other models
|
|
blacklisted_models = self._get_query_models_blacklist()
|
|
for model_name in self.env:
|
|
if model_name in blacklisted_models:
|
|
continue
|
|
|
|
model = self.env[model_name]
|
|
if model._transient or not model._auto:
|
|
continue
|
|
|
|
table_name = model._table
|
|
|
|
conditions = []
|
|
# 3.1 Search Basic Personal Data Records (aka email/name usage)
|
|
for field_name in ['email_normalized', 'email', 'email_from', 'company_email']:
|
|
if field_name in model and model._fields[field_name].store:
|
|
rec_name = model._rec_name or 'name'
|
|
is_normalized = field_name == 'email_normalized' or (model_name == 'mailing.trace' and field_name == 'email')
|
|
|
|
conditions.append(SQL(
|
|
"%s %s %s",
|
|
SQL.identifier(field_name),
|
|
SQL('=') if is_normalized else SQL('ilike'), # Manage Foo Bar <foo@bar.com>
|
|
email_normalized if is_normalized else email
|
|
))
|
|
if rec_name in model and model._fields[model._rec_name].store and model._fields[model._rec_name].type == 'char' and not model._fields[model._rec_name].translate:
|
|
conditions.append(SQL(
|
|
"%s ilike %s",
|
|
SQL.identifier(rec_name),
|
|
name,
|
|
))
|
|
if is_normalized:
|
|
break
|
|
|
|
# 3.2 Search Indirect Personal Data References (aka partner_id)
|
|
conditions.extend(
|
|
SQL(
|
|
"%s in (SELECT id FROM indirect_references)",
|
|
model._field_to_sql(table_name, field_name),
|
|
)
|
|
for field_name, field in model._fields.items()
|
|
if field.comodel_name == 'res.partner'
|
|
if field.store
|
|
if field.type == 'many2one'
|
|
if field.ondelete != 'cascade'
|
|
)
|
|
|
|
if conditions:
|
|
query = SQL("""
|
|
%s
|
|
UNION ALL
|
|
SELECT
|
|
%s AS res_model_id,
|
|
id AS res_id,
|
|
%s AS is_active
|
|
FROM %s
|
|
WHERE %s
|
|
""",
|
|
query,
|
|
self.env['ir.model'].search([('model', '=', model_name)]).id,
|
|
SQL.identifier('active') if 'active' in model else True,
|
|
SQL.identifier(table_name),
|
|
SQL(" OR ").join(conditions),
|
|
)
|
|
return query
|
|
|
|
def action_lookup(self):
|
|
self.ensure_one()
|
|
query = self._get_query()
|
|
self.env.flush_all()
|
|
self.env.cr.execute(query)
|
|
results = self.env.cr.dictfetchall()
|
|
self.line_ids = [(5, 0, 0)] + [(0, 0, reference) for reference in results]
|
|
return self.action_open_lines()
|
|
|
|
def _post_log(self):
|
|
self.ensure_one()
|
|
if not self.log_id and self.execution_details:
|
|
self.log_id = self.env['privacy.log'].create({
|
|
'anonymized_name': self.name,
|
|
'anonymized_email': self.email,
|
|
'execution_details': self.execution_details,
|
|
'records_description': self.records_description,
|
|
})
|
|
else:
|
|
self.log_id.execution_details = self.execution_details
|
|
self.log_id.records_description = self.records_description
|
|
|
|
@api.depends('line_ids.execution_details')
|
|
def _compute_execution_details(self):
|
|
for wizard in self:
|
|
wizard.execution_details = '\n'.join(line.execution_details for line in wizard.line_ids if line.execution_details)
|
|
wizard._post_log()
|
|
|
|
@api.depends('line_ids')
|
|
def _compute_records_description(self):
|
|
for wizard in self:
|
|
if not wizard.line_ids:
|
|
wizard.records_description = ''
|
|
continue
|
|
records_by_model = defaultdict(list)
|
|
for line in wizard.line_ids:
|
|
records_by_model[line.res_model_id].append(line.res_id)
|
|
wizard.records_description = '\n'.join('{model_name} ({count}): {ids_str}'.format(
|
|
model_name=(f'{model.name} - {model.model}' if self.env.user.has_group('base.group_no_one') else model.name),
|
|
count=len(ids),
|
|
ids_str=', '.join('#%s' % (rec_id) for rec_id in ids),
|
|
) for model, ids in records_by_model.items())
|
|
|
|
def action_open_lines(self):
|
|
self.ensure_one()
|
|
action = self.env['ir.actions.act_window']._for_xml_id('privacy_lookup.action_privacy_lookup_wizard_line')
|
|
action['domain'] = [('wizard_id', '=', self.id)]
|
|
return action
|
|
|
|
|
|
class PrivacyLookupWizardLine(models.TransientModel):
|
|
_name = 'privacy.lookup.wizard.line'
|
|
_description = 'Privacy Lookup Wizard Line'
|
|
_transient_max_count = 0
|
|
_transient_max_hours = 24
|
|
|
|
@api.model
|
|
def _selection_target_model(self):
|
|
return [(model.model, model.name) for model in self.env['ir.model'].sudo().search([])]
|
|
|
|
wizard_id = fields.Many2one('privacy.lookup.wizard')
|
|
res_id = fields.Integer(
|
|
string="Resource ID",
|
|
required=True)
|
|
res_name = fields.Char(
|
|
string='Resource name',
|
|
compute='_compute_res_name',
|
|
store=True)
|
|
res_model_id = fields.Many2one(
|
|
'ir.model',
|
|
'Related Document Model',
|
|
ondelete='cascade')
|
|
res_model = fields.Char(
|
|
string='Document Model',
|
|
related='res_model_id.model',
|
|
store=True,
|
|
readonly=True)
|
|
resource_ref = fields.Reference(
|
|
string='Record',
|
|
selection='_selection_target_model',
|
|
compute='_compute_resource_ref',
|
|
inverse='_set_resource_ref')
|
|
has_active = fields.Boolean(compute='_compute_has_active', store=True)
|
|
is_active = fields.Boolean()
|
|
is_unlinked = fields.Boolean()
|
|
execution_details = fields.Char(default='')
|
|
|
|
@api.depends('res_model', 'res_id', 'is_unlinked')
|
|
def _compute_resource_ref(self):
|
|
for line in self:
|
|
if line.res_model and line.res_model in self.env and not line.is_unlinked:
|
|
# Exclude records that can't be read (eg: multi-company ir.rule)
|
|
try:
|
|
self.env[line.res_model].browse(line.res_id).check_access('read')
|
|
line.resource_ref = '%s,%s' % (line.res_model, line.res_id or 0)
|
|
except Exception:
|
|
line.resource_ref = None
|
|
else:
|
|
line.resource_ref = None
|
|
|
|
def _set_resource_ref(self):
|
|
for line in self:
|
|
if line.resource_ref:
|
|
line.res_id = line.resource_ref.id
|
|
|
|
@api.depends('res_model_id')
|
|
def _compute_has_active(self):
|
|
for line in self:
|
|
if not line.res_model_id:
|
|
line.has_active = False
|
|
continue
|
|
line.has_active = 'active' in self.env[line.res_model]
|
|
|
|
@api.depends('res_model', 'res_id')
|
|
def _compute_res_name(self):
|
|
for line in self:
|
|
if not line.res_id or not line.res_model:
|
|
continue
|
|
record = self.env[line.res_model].sudo().browse(line.res_id)
|
|
if not record.exists():
|
|
continue
|
|
name = record.display_name
|
|
line.res_name = name if name else f'{line.res_model_id.name}/{line.res_id}'
|
|
|
|
@api.onchange('is_active')
|
|
def _onchange_is_active(self):
|
|
for line in self:
|
|
if not line.res_model_id or not line.res_id:
|
|
continue
|
|
action = _('Unarchived') if line.is_active else _('Archived')
|
|
line.execution_details = '%s %s #%s' % (action, line.res_model_id.name, line.res_id)
|
|
self.env[line.res_model].sudo().browse(line.res_id).write({'active': line.is_active})
|
|
|
|
def action_unlink(self):
|
|
self.ensure_one()
|
|
if self.is_unlinked:
|
|
raise UserError(_('The record is already unlinked.'))
|
|
self.env[self.res_model].sudo().browse(self.res_id).unlink()
|
|
self.execution_details = '%s %s #%s' % (_('Deleted'), self.res_model_id.name, self.res_id)
|
|
self.is_unlinked = True
|
|
|
|
def action_archive_all(self):
|
|
for line in self:
|
|
if not line.has_active or not line.is_active:
|
|
continue
|
|
line.is_active = False
|
|
line._onchange_is_active()
|
|
|
|
def action_unlink_all(self):
|
|
for line in self:
|
|
if line.is_unlinked:
|
|
continue
|
|
line.action_unlink()
|
|
|
|
def action_open_record(self):
|
|
self.ensure_one()
|
|
return {
|
|
'type': 'ir.actions.act_window',
|
|
'view_mode': 'form',
|
|
'res_id': self.res_id,
|
|
'res_model': self.res_model,
|
|
}
|