Odoo18-Base/addons/l10n_in_withholding/models/account_move.py
2025-01-06 10:57:38 +07:00

228 lines
10 KiB
Python

from odoo import api, models, fields, _, Command
from odoo.tools import SQL
from odoo.tools.date_utils import get_month
class AccountMove(models.Model):
_inherit = "account.move"
l10n_in_is_withholding = fields.Boolean(
string="Is Indian TDS Entry",
copy=False,
help="Technical field to identify Indian withholding entry"
)
l10n_in_withholding_ref_move_id = fields.Many2one(
comodel_name='account.move',
string="Indian TDS Ref Move",
readonly=True,
index='btree_not_null',
copy=False,
help="Reference move for withholding entry",
)
l10n_in_withhold_move_ids = fields.One2many(
'account.move', 'l10n_in_withholding_ref_move_id',
string="Indian TDS Entries"
)
l10n_in_withholding_line_ids = fields.One2many(
'account.move.line', 'move_id',
string="Indian TDS Lines",
compute='_compute_l10n_in_withholding_line_ids',
)
l10n_in_total_withholding_amount = fields.Monetary(
string="Total Indian TDS Amount",
compute='_compute_l10n_in_total_withholding_amount',
help="Total withholding amount for the move",
)
l10n_in_tcs_tds_warning = fields.Char('TDC/TCS Warning', compute="_compute_l10n_in_tcs_tds_warning")
l10n_in_display_higher_tcs_button = fields.Boolean(string="Display higher TCS button", compute="_compute_l10n_in_display_higher_tcs_button")
# === Compute Methods ===
@api.depends('line_ids', 'l10n_in_is_withholding')
def _compute_l10n_in_withholding_line_ids(self):
# Compute the withholding lines for the move
for move in self:
if move.l10n_in_is_withholding:
move.l10n_in_withholding_line_ids = move.line_ids.filtered('tax_ids')
else:
move.l10n_in_withholding_line_ids = False
def _compute_l10n_in_total_withholding_amount(self):
for move in self:
move.l10n_in_total_withholding_amount = sum(move.l10n_in_withhold_move_ids.filtered(
lambda m: m.state == 'posted').l10n_in_withholding_line_ids.mapped('l10n_in_withhold_tax_amount'))
def _get_l10n_in_invalid_tax_lines(self):
self.ensure_one()
if self.country_code == 'IN' and not self.commercial_partner_id.l10n_in_pan:
lines = self.env['account.move.line']
for line in self.invoice_line_ids:
for tax in line.tax_ids:
if (
tax.l10n_in_section_id.tax_source_type == 'tcs'
and tax.amount != max(tax.l10n_in_section_id.l10n_in_section_tax_ids, key=lambda t: abs(t.amount)).amount
):
lines |= line._origin
return lines
@api.depends('invoice_line_ids.tax_ids', 'commercial_partner_id.l10n_in_pan')
def _compute_l10n_in_warning(self):
super()._compute_l10n_in_warning()
for move in self:
warnings = move.l10n_in_warning or {}
lines = move._get_l10n_in_invalid_tax_lines()
if lines:
warnings['lower_tcs_tax'] = {
'message': _("As the Partner's PAN missing/invalid apply TCS at the higher rate."),
'action_text': _("View Journal Items(s)"),
'action': lines._get_records_action(
name=_("Journal Items(s)"),
target='current',
views=[(self.env.ref("l10n_in_withholding.view_move_line_tree_l10n_in").id, "list")],
domain=[('id', 'in', lines.ids)]
)
}
move.l10n_in_warning = warnings
def action_l10n_in_apply_higher_tax(self):
self.ensure_one()
invalid_lines = self._get_l10n_in_invalid_tax_lines()
for line in invalid_lines:
updated_tax_ids = []
for tax in line.tax_ids:
if tax.l10n_in_section_id.tax_source_type == 'tcs':
max_tax = max(
tax.l10n_in_section_id.l10n_in_section_tax_ids,
key=lambda t: t.amount
)
updated_tax_ids.append(max_tax.id)
else:
updated_tax_ids.append(tax.id)
if set(line.tax_ids.ids) != set(updated_tax_ids):
line.write({'tax_ids': [Command.clear()] + [Command.set(updated_tax_ids)]})
@api.depends('l10n_in_warning')
def _compute_l10n_in_display_higher_tcs_button(self):
for move in self:
move.l10n_in_display_higher_tcs_button = (
move.l10n_in_warning
and move.l10n_in_warning.get('lower_tcs_tax')
)
def action_l10n_in_withholding_entries(self):
self.ensure_one()
return {
'name': "TDS Entries",
'type': 'ir.actions.act_window',
'res_model': 'account.move',
'view_mode': 'list,form',
'domain': [('id', 'in', self.l10n_in_withhold_move_ids.ids)],
}
def _get_sections_aggregate_sum_by_pan(self, section_alert, commercial_partner_id):
self.ensure_one()
month_start_date, month_end_date = get_month(self.date)
company_fiscalyear_dates = self.company_id.compute_fiscalyear_dates(self.date)
fiscalyear_start_date, fiscalyear_end_date = company_fiscalyear_dates['date_from'], company_fiscalyear_dates['date_to']
default_domain = [
('account_id.l10n_in_tds_tcs_section_id', '=', section_alert.id),
('move_id.move_type', '!=', 'entry'),
('company_id', 'child_of', self.company_id.root_id.id),
('parent_state', '=', 'posted')
]
if commercial_partner_id.l10n_in_pan:
default_domain += [('move_id.commercial_partner_id.l10n_in_pan', '=', commercial_partner_id.l10n_in_pan)]
else:
default_domain += [('move_id.commercial_partner_id', '=', commercial_partner_id.id)]
frequency_domains = {
'monthly': [('date', '>=', month_start_date), ('date', '<=', month_end_date)],
'fiscal_yearly': [('date', '>=', fiscalyear_start_date), ('date', '<=', fiscalyear_end_date)],
}
aggregate_result = {}
for frequency, frequency_domain in frequency_domains.items():
query = self.env['account.move.line']._where_calc(default_domain + frequency_domain)
result = self.env.execute_query_dict(SQL(
"""
SELECT COALESCE(sum(account_move_line.balance), 0) as balance,
COALESCE(sum(account_move_line.price_total * am.invoice_currency_rate), 0) as price_total
FROM %s
JOIN account_move AS am ON am.id = account_move_line.move_id
WHERE %s
""",
query.from_clause,
query.where_clause)
)
aggregate_result[frequency] = result[0]
return aggregate_result
def _l10n_in_is_warning_applicable(self, section_id):
self.ensure_one()
match section_id.tax_source_type:
case 'tcs':
return self.journal_id.type == 'sale'
case 'tds':
return (
self.journal_id.type == 'purchase'
and section_id not in self.l10n_in_withhold_move_ids.filtered(lambda m:
m.state == 'posted'
).mapped('line_ids.tax_ids.l10n_in_section_id')
)
case _:
return False
@api.depends('invoice_line_ids.price_total')
def _compute_l10n_in_tcs_tds_warning(self):
def _group_by_section_alert(invoice_lines):
group_by_lines = {}
for line in invoice_lines:
group_key = line.account_id.l10n_in_tds_tcs_section_id
if group_key and not line.company_currency_id.is_zero(line.price_total):
group_by_lines.setdefault(group_key, [])
group_by_lines[group_key].append(line)
return group_by_lines
def _is_section_applicable(section_alert, threshold_sums, invoice_currency_rate, lines):
lines_total = sum(
(line.price_total * invoice_currency_rate) if section_alert.consider_amount == 'total_amount' else line.balance
for line in lines
)
if section_alert.is_aggregate_limit:
aggregate_period_key = section_alert.consider_amount == 'total_amount' and 'price_total' or 'balance'
aggregate_total = threshold_sums.get(section_alert.aggregate_period, {}).get(aggregate_period_key)
if move.state == 'draft':
aggregate_total += lines_total
if aggregate_total > section_alert.aggregate_limit:
return True
return (
section_alert.is_per_transaction_limit
and lines_total > section_alert.per_transaction_limit
)
for move in self:
if move.country_code == 'IN' and move.move_type in ['in_invoice', 'out_invoice']:
warning = set()
commercial_partner_id = move.commercial_partner_id
existing_section = (self.l10n_in_withhold_move_ids.line_ids + move.line_ids).tax_ids.l10n_in_section_id
for section_alert, lines in _group_by_section_alert(move.invoice_line_ids).items():
if (
(section_alert not in existing_section
or [line for line in lines if section_alert not in line.tax_ids.l10n_in_section_id])
and move._l10n_in_is_warning_applicable(section_alert)
and _is_section_applicable(
section_alert,
move._get_sections_aggregate_sum_by_pan(
section_alert,
commercial_partner_id
),
move.invoice_currency_rate,
lines
)
):
warning.add(section_alert.id)
warning_sections = self.env['l10n_in.section.alert'].browse(warning)
if warning_sections:
move.l10n_in_tcs_tds_warning = warning_sections._get_warning_message()
else:
move.l10n_in_tcs_tds_warning = False
else:
move.l10n_in_tcs_tds_warning = False