Odoo18-Base/addons/account/wizard/account_merge_wizard.py
2025-01-06 10:57:38 +07:00

352 lines
15 KiB
Python

import json
from odoo import _, api, fields, models, Command
from odoo.exceptions import UserError
from odoo.tools import SQL
class AccountMergeWizard(models.TransientModel):
_name = 'account.merge.wizard'
_description = "Account merge wizard"
account_ids = fields.Many2many('account.account')
is_group_by_name = fields.Boolean(
string="Group by name?",
default=False,
help="Tick this checkbox if you want accounts to be grouped by name for merging."
)
wizard_line_ids = fields.One2many(
comodel_name='account.merge.wizard.line',
inverse_name='wizard_id',
compute='_compute_wizard_line_ids',
store=True,
readonly=False,
)
disable_merge_button = fields.Boolean(compute='_compute_disable_merge_button')
@api.model
def default_get(self, fields):
res = super().default_get(fields)
if not set(fields) & {'account_ids', 'wizard_line_ids'} or set(res.keys()) & {'account_ids', 'wizard_line_ids'}:
return res
if self.env.context.get('active_model') != 'account.account':
raise UserError(_("This can only be used on accounts."))
if len(self.env.context.get('active_ids') or []) < 2:
raise UserError(_("You must select at least 2 accounts."))
res['account_ids'] = [Command.set(self.env.context.get('active_ids'))]
return res
def _get_grouping_key(self, account):
""" Return a grouping key for the given account. """
self.ensure_one()
grouping_fields = ['account_type', 'non_trade', 'currency_id', 'reconcile', 'deprecated']
if self.is_group_by_name:
grouping_fields.append('name')
return tuple(account[field] for field in grouping_fields)
@api.depends('is_group_by_name', 'account_ids')
def _compute_wizard_line_ids(self):
""" Determine which accounts to merge together. """
for wizard in self:
# Filter out Bank / Cash accounts
accounts = wizard.account_ids._origin.filtered(lambda a: a.account_type not in ('asset_bank', 'asset_cash'))
wizard_lines_vals_list = []
sequence = 0
for grouping_key, group_accounts in accounts.grouped(wizard._get_grouping_key).items():
grouping_key_str = str(grouping_key)
wizard_lines_vals_list.append({
'display_type': 'line_section',
'grouping_key': grouping_key_str,
'sequence': (sequence := sequence + 1),
'account_id': group_accounts[0].id # Used to compute the group name
})
for account in group_accounts:
wizard_lines_vals_list.append({
'display_type': 'account',
'account_id': account.id,
'grouping_key': grouping_key_str,
'is_selected': True,
'sequence': (sequence := sequence + 1),
})
wizard.wizard_line_ids = [Command.clear()] + [
Command.create(vals)
for vals in wizard_lines_vals_list
]
@api.depends('wizard_line_ids.is_selected', 'wizard_line_ids.info')
def _compute_disable_merge_button(self):
for wizard in self:
wizard_lines_to_merge = wizard.wizard_line_ids.filtered(lambda l: l.display_type == 'account' and l.is_selected and not l.info)
wizard.disable_merge_button = all(
len(wizard_line_group) < 2
for wizard_line_group in wizard_lines_to_merge.grouped('grouping_key').values()
)
def _get_window_action(self):
self.ensure_one()
return {
'type': 'ir.actions.act_window',
'name': _("Merge Accounts"),
'view_id': self.env.ref('account.account_merge_wizard_form').id,
'context': self.env.context,
'res_model': 'account.merge.wizard',
'res_id': self.id,
'target': 'new',
'view_mode': 'form',
}
def action_merge(self):
""" Merge each group of accounts in `self.wizard_line_ids`. """
self._check_access_rights(self.account_ids)
for wizard in self:
wizard_lines_selected = wizard.wizard_line_ids.filtered(lambda l: l.display_type == 'account' and l.is_selected and not l.info)
for wizard_lines_group in wizard_lines_selected.grouped('grouping_key').values():
if len(wizard_lines_group) > 1:
# This ensures that if one account in the group has hashed entries, it appears first, ensuring
# that its ID doesn't get changed by the merge.
self._action_merge(wizard_lines_group.sorted('account_has_hashed_entries', reverse=True).account_id)
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'success',
'sticky': False,
'message': _("Accounts successfully merged!"),
'next': {'type': 'ir.actions.act_window_close'},
}
}
@api.model
def _check_access_rights(self, accounts):
accounts.check_access('write')
if forbidden_companies := (accounts.sudo().company_ids - self.env.user.company_ids):
raise UserError(_(
"You do not have the right to perform this operation as you do not have access to the following companies: %s.",
", ".join(c.name for c in forbidden_companies)
))
@api.model
def _action_merge(self, accounts):
""" Merge `accounts`:
- the first account is extended to each company of the others, keeping their codes and names;
- the others are deleted; and
- journal items and other references are retargeted to the first account.
"""
# Step 1: Keep track of the company_ids and codes we should write on the account.
# We will do so only at the end, to avoid triggering the constraint that prevents duplicate codes.
company_ids_to_write = accounts.sudo().company_ids
code_by_company = {}
all_root_companies = self.env['res.company'].sudo().search([('parent_id', '=', False)])
for account in accounts:
for company in account.company_ids & all_root_companies:
code_by_company[company] = account.with_company(company).sudo().code
for company in all_root_companies - account.company_ids:
if code := account.with_company(company).sudo().code:
code_by_company[company] = code
account_to_merge_into = accounts[0]
accounts_to_remove = accounts[1:]
# Step 2: Check that we have write access to all the accounts and access to all the companies
# of these accounts.
self._check_access_rights(accounts)
# Step 3: Update records in DB.
# 3.1: Update foreign keys in DB
wiz = self.env['base.partner.merge.automatic.wizard'].new()
wiz._update_foreign_keys_generic('account.account', accounts_to_remove, account_to_merge_into)
# 3.2: Update Reference and Many2OneReference fields that reference account.account
wiz._update_reference_fields_generic('account.account', accounts_to_remove, account_to_merge_into)
# 3.3: Merge translations
account_names = self.env.execute_query(SQL(
"""
SELECT id, name
FROM account_account
WHERE id IN %(account_ids)s
""",
account_ids=tuple(accounts.ids),
))
account_name_by_id = dict(account_names)
# Construct JSON of name translations, with first account taking precedence.
merged_account_name = {}
for account_id in accounts.ids[::-1]:
merged_account_name.update(account_name_by_id[account_id])
self.env.cr.execute(SQL(
"""
UPDATE account_account
SET name = %(account_name_json)s
WHERE id = %(account_to_merge_into_id)s
""",
account_name_json=json.dumps(merged_account_name),
account_to_merge_into_id=account_to_merge_into.id,
))
# Step 4: Remove merged accounts
self.env.invalidate_all()
self.env.cr.execute(SQL(
"""
DELETE FROM account_account
WHERE id IN %(account_ids_to_delete)s
""",
account_ids_to_delete=tuple(accounts_to_remove.ids),
))
# Clear ir.model.data ormcache
self.env.registry.clear_cache()
# Step 5: Write company_ids and codes on the account
for company, code in code_by_company.items():
account_to_merge_into.with_company(company).sudo().code = code
account_to_merge_into.sudo().company_ids = company_ids_to_write
class AccountMergeWizardLine(models.TransientModel):
_name = 'account.merge.wizard.line'
_description = "Account merge wizard line"
_order = 'sequence, id'
wizard_id = fields.Many2one(
comodel_name='account.merge.wizard',
required=True,
ondelete='cascade',
)
grouping_key = fields.Char()
sequence = fields.Integer()
display_type = fields.Selection(
selection=[
('line_section', "Section"),
('account', "Account"),
],
required=True,
)
is_selected = fields.Boolean()
account_id = fields.Many2one(
string="Account",
comodel_name='account.account',
ondelete='cascade',
readonly=True,
)
company_ids = fields.Many2many(
string="Companies",
related='account_id.company_ids',
)
info = fields.Char(
string='Info',
compute='_compute_info',
help="Contains either the section name or error message, depending on the line type."
)
account_has_hashed_entries = fields.Boolean(compute='_compute_account_has_hashed_entries')
@api.depends('account_id')
def _compute_account_has_hashed_entries(self):
# optimization to avoid having to re-check which accounts have hashed entries
query = self.env['account.move.line']._where_calc([
('account_id', 'in', self.account_id.ids),
('move_id.inalterable_hash', '!=', False),
])
query_result = self.env.execute_query(query.select(SQL('DISTINCT account_move_line.account_id')))
accounts_with_hashed_entries_ids = {r[0] for r in query_result}
wizard_lines_with_hashed_entries = self.filtered(lambda l: l.account_id.id in accounts_with_hashed_entries_ids)
wizard_lines_with_hashed_entries.account_has_hashed_entries = True
(self - wizard_lines_with_hashed_entries).account_has_hashed_entries = False
@api.depends('account_id', 'wizard_id.wizard_line_ids.is_selected', 'display_type')
def _compute_info(self):
""" This re-computes the error message for each wizard line every time the user selects or deselects a wizard line.
In reality accounts will only affect the mergeability of other accounts in the same merge group.
Therefore this method delegates the logic of determining whether an account can be merged to
`_apply_different_companies_constraint` and `_apply_hashed_moves_constraint` which work on a merge group basis.
"""
for wizard_line in self.filtered(lambda l: l.display_type == 'line_section'):
wizard_line.info = wizard_line._get_group_name()
for wizard_line_group in self.filtered(lambda l: l.display_type == 'account').grouped(lambda l: (l.wizard_id, l.grouping_key)).values():
# Reset the error messages for the wizard lines in the group to False, then
# re-compute them for the whole group.
wizard_line_group.info = False
wizard_line_group._apply_different_companies_constraint()
wizard_line_group._apply_hashed_moves_constraint()
def _get_group_name(self):
""" Return a human-readable name for a wizard line's group, based on its `account_id`, in the format:
'{Trade/Non-trade} Receivable {USD} {Reconcilable} {Deprecated}'
"""
self.ensure_one()
account_type_label = dict(self.pool['account.account'].account_type._description_selection(self.env))[self.account_id.account_type]
if self.account_id.account_type in ['asset_receivable', 'liability_payable']:
account_type_label = _("Non-trade %s", account_type_label) if self.account_id.non_trade else _("Trade %s", account_type_label)
other_name_elements = []
if self.account_id.currency_id:
other_name_elements.append(self.account_id.currency_id.name)
if self.account_id.reconcile:
other_name_elements.append(_("Reconcilable"))
if self.account_id.deprecated:
other_name_elements.append(_("Deprecated"))
if not self.wizard_id.is_group_by_name:
grouping_key_name = account_type_label
if other_name_elements:
grouping_key_name = f'{grouping_key_name} ({", ".join(other_name_elements)})'
else:
grouping_key_name = f'{self.account_id.name} ({", ".join([account_type_label] + other_name_elements)})'
return grouping_key_name
def _apply_different_companies_constraint(self):
""" Set `info` on wizard lines if an account cannot be merged
because it belongs to the same company as another account.
If users want to do that, they should mass-edit the account on the journal items.
The wizard lines in `self` should have the same `grouping_key`.
"""
companies_seen = self.env['res.company']
account_belonging_to_company = {}
for wizard_line in self:
if wizard_line.is_selected and not wizard_line.info:
if shared_companies := (wizard_line.company_ids & companies_seen):
wizard_line.info = _(
"Belongs to the same company as %s.",
account_belonging_to_company[shared_companies[0]].display_name
)
else:
companies_seen |= wizard_line.company_ids
for company in wizard_line.company_ids:
if company not in account_belonging_to_company:
account_belonging_to_company[company] = wizard_line.account_id
def _apply_hashed_moves_constraint(self):
""" Set `info` on wizard lines if an account cannot be merged because it
has hashed entries.
If there are hashed entries in an account, then the merge must preserve that account's ID.
So we cannot merge two accounts that contain hashed entries.
The wizard lines in `self` should have the same `grouping_key`.
"""
account_to_merge_into = None
for wizard_line in self:
if wizard_line.is_selected and not wizard_line.info and wizard_line.account_has_hashed_entries:
if not account_to_merge_into:
account_to_merge_into = wizard_line.account_id
else:
wizard_line.info = _(
"Contains hashed entries, but %s also has hashed entries.",
account_to_merge_into.display_name
)