293 lines
16 KiB
Python
293 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
||
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
import logging
|
||
import json
|
||
import re
|
||
from datetime import datetime
|
||
|
||
from odoo import models, fields, _, api
|
||
from odoo.exceptions import UserError
|
||
|
||
_logger = logging.getLogger(__name__)
|
||
|
||
class AccountMove(models.Model):
|
||
_inherit = 'account.move'
|
||
|
||
l10n_ke_cu_datetime = fields.Datetime(string='CU Signing Date and Time', copy=False)
|
||
l10n_ke_cu_serial_number = fields.Char(string='CU Serial Number', copy=False)
|
||
l10n_ke_cu_invoice_number = fields.Char(string='CU Invoice Number', copy=False)
|
||
l10n_ke_cu_qrcode = fields.Char(string='CU QR Code', copy=False)
|
||
l10n_ke_cu_show_send_button = fields.Boolean(string='Show Send to Tremol button', compute='_compute_l10n_ke_cu_show_send_button')
|
||
|
||
@api.depends('country_code', 'l10n_ke_cu_qrcode', 'state', 'move_type', 'company_id')
|
||
def _compute_l10n_ke_cu_show_send_button(self):
|
||
for move in self:
|
||
move.l10n_ke_cu_show_send_button = (
|
||
move.country_code == 'KE'
|
||
and not move.l10n_ke_cu_qrcode
|
||
and move.state == 'posted'
|
||
and move.move_type in ['out_invoice', 'out_refund']
|
||
and not move.company_id.l10n_ke_oscu_is_active
|
||
)
|
||
|
||
# -------------------------------------------------------------------------
|
||
# HELPERS
|
||
# -------------------------------------------------------------------------
|
||
|
||
def _l10n_ke_fmt(self, string, length, ljust=True):
|
||
""" Function for common formatting behaviour
|
||
|
||
:param string: string to be formatted/encoded
|
||
:param length: integer length to justify (if enabled), and then truncate the string to
|
||
:param ljust: boolean representing whether the string should be justified
|
||
:returns: byte-string justified/truncated, with all non-alphanumeric characters removed
|
||
"""
|
||
if not string:
|
||
string = ''
|
||
return re.sub('[^A-Za-z0-9 ]+', '', str(string)).encode('cp1251').ljust(length if ljust else 0)[:length]
|
||
|
||
# -------------------------------------------------------------------------
|
||
# CHECKS
|
||
# -------------------------------------------------------------------------
|
||
|
||
def _l10n_ke_validate_move(self):
|
||
""" Returns list of errors related to misconfigurations per move
|
||
|
||
Find misconfigurations on the move, the lines of the move, and the
|
||
taxes on those lines that would result in rejection by the KRA.
|
||
"""
|
||
errors = []
|
||
for move in self:
|
||
move_errors = []
|
||
if move.country_code != 'KE':
|
||
move_errors.append(_("This invoice is not a Kenyan invoice and therefore can not be sent to the device."))
|
||
|
||
if move.company_id.currency_id != self.env.ref('base.KES'):
|
||
move_errors.append(_("This invoice's company currency is not in Kenyan Shillings, conversion to KES is not possible."))
|
||
|
||
if move.state != 'posted':
|
||
move_errors.append(_("This invoice/credit note has not been posted. Please confirm it to continue."))
|
||
|
||
if move.move_type not in ('out_refund', 'out_invoice'):
|
||
move_errors.append(_("The document being sent should be an invoice or credit note."))
|
||
|
||
if any([move.l10n_ke_cu_invoice_number, move.l10n_ke_cu_serial_number, move.l10n_ke_cu_qrcode, move.l10n_ke_cu_datetime]):
|
||
move_errors.append(_("The document already has details related to the fiscal device. Please make sure that the invoice has not already been sent."))
|
||
|
||
# The credit note should refer to the control unit number (receipt number) of the original
|
||
# invoice to which it relates.
|
||
if move.move_type == 'out_refund' and not move.reversed_entry_id.l10n_ke_cu_invoice_number:
|
||
move_errors.append(_("This credit note must reference the previous invoice, and this previous invoice must have already been submitted."))
|
||
|
||
for line in self.invoice_line_ids.filtered(lambda l: l.display_type == 'product'):
|
||
vat_taxes = line.tax_ids.filtered(lambda tax: tax.amount in (16, 8, 0))
|
||
if not vat_taxes or len(vat_taxes) > 1:
|
||
move_errors.append(_("On line %s, you must select one and only one VAT tax.", line.name))
|
||
else:
|
||
if vat_taxes[0].amount == 0 and not line.tax_ids[0].l10n_ke_item_code_id:
|
||
move_errors.append(_("On line %s, a tax with a KRA item code must be selected, since the tax is 0%% or exempt.", line.name))
|
||
|
||
if move_errors:
|
||
errors.append((move.name, move_errors))
|
||
|
||
return errors
|
||
|
||
def _l10n_ke_fiscal_device_details_filled(self):
|
||
self.ensure_one()
|
||
# If the company is configured for OSCU, don't block the Send & Print.
|
||
if self.company_id.l10n_ke_oscu_is_active:
|
||
return True
|
||
return all([
|
||
self.country_code == 'KE',
|
||
self.l10n_ke_cu_invoice_number,
|
||
self.l10n_ke_cu_serial_number,
|
||
self.l10n_ke_cu_qrcode,
|
||
self.l10n_ke_cu_datetime,
|
||
])
|
||
|
||
# -------------------------------------------------------------------------
|
||
# SERIALISERS
|
||
# -------------------------------------------------------------------------
|
||
|
||
def _l10n_ke_cu_open_invoice_message(self):
|
||
""" Serialise the required fields for opening an invoice
|
||
|
||
:returns: a list containing one byte-string representing the <CMD> and
|
||
<DATA> of the message sent to the fiscal device.
|
||
"""
|
||
headquarter_address = (self.commercial_partner_id.street or '') + (self.commercial_partner_id.street2 or '')
|
||
customer_address = (self.partner_id.street or '') + (self.partner_id.street2 or '')
|
||
postcode_and_city = (self.partner_id.zip or '') + '' + (self.partner_id.city or '')
|
||
vat = (self.commercial_partner_id.vat or '').strip() if self.commercial_partner_id.country_id.code == 'KE' else ''
|
||
invoice_elements = [
|
||
b'1', # Reserved - 1 symbol with value '1'
|
||
b' 0', # Reserved - 6 symbols with value ‘ 0’
|
||
b'0', # Reserved - 1 symbol with value '0'
|
||
b'1' if self.move_type == 'out_invoice' else b'A', # 1 symbol with value '1' (new invoice), 'A' (credit note), or '@' (debit note)
|
||
self._l10n_ke_fmt(self.commercial_partner_id.name, 30), # 30 symbols for Company name
|
||
self._l10n_ke_fmt(vat, 14), # 14 Symbols for the client PIN number
|
||
self._l10n_ke_fmt(headquarter_address, 30), # 30 Symbols for customer headquarters
|
||
self._l10n_ke_fmt(customer_address, 30), # 30 Symbols for the address
|
||
self._l10n_ke_fmt(postcode_and_city, 30), # 30 symbols for the customer post code and city
|
||
self._l10n_ke_fmt('', 30), # 30 symbols for the exemption number
|
||
]
|
||
if self.move_type == 'out_refund':
|
||
invoice_elements.append(self._l10n_ke_fmt(self.reversed_entry_id.l10n_ke_cu_invoice_number, 19)), # 19 symbols for related invoice number
|
||
invoice_elements.append(re.sub('[^A-Za-z0-9 ]+', '', self.name)[-15:].ljust(15).encode('cp1251')) # 15 symbols for trader system invoice number
|
||
|
||
# Command: Open fiscal record (0x30)
|
||
return [b'\x30' + b';'.join(invoice_elements)]
|
||
|
||
def _l10n_ke_cu_lines_messages(self):
|
||
""" Serialise the data of each line on the invoice
|
||
|
||
This function transforms the lines in order to handle the differences
|
||
between the KRA expected data and the lines in odoo.
|
||
|
||
If a discount line (as a negative line) has been added to the invoice
|
||
lines, find a suitable line/lines to distribute the discount accross
|
||
|
||
:returns: List of byte-strings representing each command <CMD> and the
|
||
<DATA> of the line, which will be sent to the fiscal device
|
||
in order to add a line to the opened invoice.
|
||
"""
|
||
def is_discount_line(line):
|
||
return line.price_subtotal < 0.0
|
||
|
||
def is_candidate(discount_line, other_line):
|
||
""" If the of one line match those of the discount line, the discount can be distributed accross that line """
|
||
discount_taxes = discount_line.tax_ids.flatten_taxes_hierarchy()
|
||
other_line_taxes = other_line.tax_ids.flatten_taxes_hierarchy()
|
||
return set(discount_taxes.ids) == set(other_line_taxes.ids)
|
||
|
||
lines = self.invoice_line_ids.filtered(lambda l: l.display_type == 'product' and l.quantity and l.price_total)
|
||
# The device expects all monetary values in Kenyan Shillings
|
||
if self.currency_id == self.company_id.currency_id:
|
||
currency_rate = 1
|
||
# In the case of a refund, use the currency rate of the original invoice
|
||
elif self.move_type == 'out_refund' and self.reversed_entry_id:
|
||
currency_rate = abs(self.reversed_entry_id.amount_total_signed / self.reversed_entry_id.amount_total)
|
||
else:
|
||
currency_rate = abs(self.amount_total_signed / self.amount_total)
|
||
|
||
discount_dict = {line.id: line.discount for line in lines if line.price_total > 0}
|
||
for line in lines:
|
||
if not is_discount_line(line):
|
||
continue
|
||
# Search for non-discount lines
|
||
candidate_vals_list = [l for l in lines if not is_discount_line(l) and is_candidate(l, line)]
|
||
candidate_vals_list = sorted(candidate_vals_list, key=lambda x: x.price_unit * x.quantity, reverse=True)
|
||
line_to_discount = abs(line.price_unit * line.quantity)
|
||
for candidate in candidate_vals_list:
|
||
still_to_discount = abs(candidate.price_unit * candidate.quantity * (100.0 - discount_dict[candidate.id]) / 100.0)
|
||
if line_to_discount >= still_to_discount:
|
||
discount_dict[candidate.id] = 100.0
|
||
line_to_discount -= still_to_discount
|
||
else:
|
||
rest_to_discount = abs((line_to_discount / (candidate.price_unit * candidate.quantity)) * 100.0)
|
||
discount_dict[candidate.id] += rest_to_discount
|
||
break
|
||
|
||
msgs = []
|
||
tax_details = self._prepare_invoice_aggregated_taxes()
|
||
for line in self.invoice_line_ids.filtered(lambda l: l.display_type == 'product' and l.quantity and l.price_total > 0 and not discount_dict.get(l.id) >= 100):
|
||
# Here we use the original discount of the line, since it the distributed discount has not been applied in the price_total
|
||
price_total = 0
|
||
percentage = 0
|
||
item_code = line.tax_ids[0].l10n_ke_item_code_id
|
||
for tax in tax_details['tax_details_per_record'][line]['tax_details']:
|
||
if tax.amount in (16, 8, 0): # This should only occur once
|
||
line_tax_details = tax_details['tax_details_per_record'][line]['tax_details'][tax]
|
||
price_total = abs(line_tax_details['base_amount_currency']) + abs(line_tax_details['tax_amount_currency'])
|
||
percentage = tax.amount
|
||
price = round(price_total / abs(line.quantity) * 100 / (100 - line.discount), 2) * currency_rate
|
||
price = ('%.5f' % price).rstrip('0').rstrip('.')
|
||
uom = line.product_uom_id and line.product_uom_id.name or ''
|
||
|
||
line_data = b';'.join([
|
||
self._l10n_ke_fmt(line.product_id.display_name or line.name, 36), # 36 symbols for the article's name
|
||
self._l10n_ke_fmt(item_code.tax_rate or 'A', 1), # 1 symbol for article's vat class ('A', 'B', 'C', 'D', or 'E')
|
||
price[:15].encode('cp1251'), # 1 to 15 symbols for article's price with up to 5 digits after decimal point
|
||
self._l10n_ke_fmt(uom, 3), # 3 symbols for unit of measure
|
||
(item_code.code or '').ljust(10).encode('cp1251'), # 10 symbols for KRA item code in the format xxxx.xx.xx (can be empty)
|
||
self._l10n_ke_fmt(item_code.description or '', 20), # 20 symbols for KRA item code description (can be empty)
|
||
str(percentage).encode('cp1251')[:5] # up to 5 symbols for vat rate
|
||
])
|
||
# 1 to 10 symbols for quantity
|
||
line_data += b'*' + str(abs(line.quantity)).encode('cp1251')[:10]
|
||
if discount_dict.get(line.id):
|
||
# 1 to 7 symbols for percentage of discount/addition
|
||
discount_sign = b'-' if discount_dict[line.id] > 0 else b'+'
|
||
discount = discount_sign + str(abs(discount_dict[line.id])).encode('cp1251')[:6]
|
||
line_data += b',' + discount + b'%'
|
||
|
||
# Command: Sale of article (0x31)
|
||
msgs += [b'\x31' + line_data]
|
||
return msgs
|
||
|
||
def _l10n_ke_get_cu_messages(self):
|
||
""" Composes a list of all the command and data parts of the messages
|
||
required for the fiscal device to open an invoice, add lines and
|
||
subsequently close it.
|
||
"""
|
||
self.ensure_one()
|
||
msgs = self._l10n_ke_cu_open_invoice_message()
|
||
msgs += self._l10n_ke_cu_lines_messages()
|
||
# Command: Close fiscal reciept (0x38)
|
||
msgs += [b'\x38']
|
||
# Command: Read date and time (0x68)
|
||
msgs += [b'\x68']
|
||
return msgs
|
||
|
||
# -------------------------------------------------------------------------
|
||
# POST COMMANDS / RECEIVE DATA
|
||
# -------------------------------------------------------------------------
|
||
|
||
def l10n_ke_action_cu_post(self):
|
||
""" Returns the client action descriptor dictionary for sending the
|
||
invoice(s) to the control unit (the fiscal device).
|
||
"""
|
||
# If l10n_ke_edi_oscu is configured for the company, disable sending via TREMOL.
|
||
if self.company_id.l10n_ke_oscu_is_active:
|
||
raise UserError(
|
||
_('An OSCU has been initialized for this company. Please send the e-invoice via Send and Print -> Send to eTIMS instead.')
|
||
)
|
||
# Check the configuration of the invoice
|
||
errors = self._l10n_ke_validate_move()
|
||
if errors:
|
||
error_msg = ""
|
||
for move, error_list in errors:
|
||
error_list = '\n'.join(error_list)
|
||
error_msg += _("Invalid invoice configuration on %(invoice)s:\n%(error_list)s\n\n", invoice=move, error_list=error_list)
|
||
raise UserError(error_msg)
|
||
return {
|
||
'type': 'ir.actions.client',
|
||
'tag': 'l10n_ke_post_send',
|
||
'params': [
|
||
{
|
||
'move_id': move.id,
|
||
'messages': json.dumps([msg.decode('cp1251') for msg in move._l10n_ke_get_cu_messages()]),
|
||
'proxy_address': move.company_id.l10n_ke_cu_proxy_address,
|
||
'company_vat': move.company_id.vat,
|
||
'name': move.name,
|
||
} for move in self
|
||
]
|
||
}
|
||
|
||
def l10n_ke_cu_responses(self, responses):
|
||
""" Set the fields related to the fiscal device on the invoice.
|
||
|
||
This is intended to be utilized by an RPC call from the javascript
|
||
client action. The fields are prefixed with l10n_ke_cu_*, which refers
|
||
to the fact that they originate from the control unit.
|
||
"""
|
||
for response in responses:
|
||
move = self.browse(int(response['move_id']))
|
||
replies = [msg for msg in response['replies']]
|
||
move.update({
|
||
'l10n_ke_cu_serial_number': response['serial_number'],
|
||
'l10n_ke_cu_invoice_number': replies[-2].split(';')[0],
|
||
'l10n_ke_cu_qrcode': replies[-2].split(';')[1].strip(),
|
||
'l10n_ke_cu_datetime': datetime.strptime(replies[-1], '%d-%m-%Y %H:%M'),
|
||
})
|