2025-01-06 10:57:38 +07:00

293 lines
16 KiB
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import logging
import json
import re
from datetime import datetime
from odoo import models, fields, _, api
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class AccountMove(models.Model):
_inherit = 'account.move'
l10n_ke_cu_datetime = fields.Datetime(string='CU Signing Date and Time', copy=False)
l10n_ke_cu_serial_number = fields.Char(string='CU Serial Number', copy=False)
l10n_ke_cu_invoice_number = fields.Char(string='CU Invoice Number', copy=False)
l10n_ke_cu_qrcode = fields.Char(string='CU QR Code', copy=False)
l10n_ke_cu_show_send_button = fields.Boolean(string='Show Send to Tremol button', compute='_compute_l10n_ke_cu_show_send_button')
@api.depends('country_code', 'l10n_ke_cu_qrcode', 'state', 'move_type', 'company_id')
def _compute_l10n_ke_cu_show_send_button(self):
for move in self:
move.l10n_ke_cu_show_send_button = (
move.country_code == 'KE'
and not move.l10n_ke_cu_qrcode
and move.state == 'posted'
and move.move_type in ['out_invoice', 'out_refund']
and not move.company_id.l10n_ke_oscu_is_active
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def _l10n_ke_fmt(self, string, length, ljust=True):
""" Function for common formatting behaviour
:param string: string to be formatted/encoded
:param length: integer length to justify (if enabled), and then truncate the string to
:param ljust: boolean representing whether the string should be justified
:returns: byte-string justified/truncated, with all non-alphanumeric characters removed
if not string:
string = ''
return re.sub('[^A-Za-z0-9 ]+', '', str(string)).encode('cp1251').ljust(length if ljust else 0)[:length]
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def _l10n_ke_validate_move(self):
""" Returns list of errors related to misconfigurations per move
Find misconfigurations on the move, the lines of the move, and the
taxes on those lines that would result in rejection by the KRA.
errors = []
for move in self:
move_errors = []
if move.country_code != 'KE':
move_errors.append(_("This invoice is not a Kenyan invoice and therefore can not be sent to the device."))
if move.company_id.currency_id != self.env.ref('base.KES'):
move_errors.append(_("This invoice's company currency is not in Kenyan Shillings, conversion to KES is not possible."))
if move.state != 'posted':
move_errors.append(_("This invoice/credit note has not been posted. Please confirm it to continue."))
if move.move_type not in ('out_refund', 'out_invoice'):
move_errors.append(_("The document being sent should be an invoice or credit note."))
if any([move.l10n_ke_cu_invoice_number, move.l10n_ke_cu_serial_number, move.l10n_ke_cu_qrcode, move.l10n_ke_cu_datetime]):
move_errors.append(_("The document already has details related to the fiscal device. Please make sure that the invoice has not already been sent."))
# The credit note should refer to the control unit number (receipt number) of the original
# invoice to which it relates.
if move.move_type == 'out_refund' and not move.reversed_entry_id.l10n_ke_cu_invoice_number:
move_errors.append(_("This credit note must reference the previous invoice, and this previous invoice must have already been submitted."))
for line in self.invoice_line_ids.filtered(lambda l: l.display_type == 'product'):
vat_taxes = line.tax_ids.filtered(lambda tax: tax.amount in (16, 8, 0))
if not vat_taxes or len(vat_taxes) > 1:
move_errors.append(_("On line %s, you must select one and only one VAT tax.", line.name))
if vat_taxes[0].amount == 0 and not line.tax_ids[0].l10n_ke_item_code_id:
move_errors.append(_("On line %s, a tax with a KRA item code must be selected, since the tax is 0%% or exempt.", line.name))
if move_errors:
errors.append((move.name, move_errors))
return errors
def _l10n_ke_fiscal_device_details_filled(self):
# If the company is configured for OSCU, don't block the Send & Print.
if self.company_id.l10n_ke_oscu_is_active:
return True
return all([
self.country_code == 'KE',
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def _l10n_ke_cu_open_invoice_message(self):
""" Serialise the required fields for opening an invoice
:returns: a list containing one byte-string representing the <CMD> and
<DATA> of the message sent to the fiscal device.
headquarter_address = (self.commercial_partner_id.street or '') + (self.commercial_partner_id.street2 or '')
customer_address = (self.partner_id.street or '') + (self.partner_id.street2 or '')
postcode_and_city = (self.partner_id.zip or '') + '' + (self.partner_id.city or '')
vat = (self.commercial_partner_id.vat or '').strip() if self.commercial_partner_id.country_id.code == 'KE' else ''
invoice_elements = [
b'1', # Reserved - 1 symbol with value '1'
b' 0', # Reserved - 6 symbols with value 0
b'0', # Reserved - 1 symbol with value '0'
b'1' if self.move_type == 'out_invoice' else b'A', # 1 symbol with value '1' (new invoice), 'A' (credit note), or '@' (debit note)
self._l10n_ke_fmt(self.commercial_partner_id.name, 30), # 30 symbols for Company name
self._l10n_ke_fmt(vat, 14), # 14 Symbols for the client PIN number
self._l10n_ke_fmt(headquarter_address, 30), # 30 Symbols for customer headquarters
self._l10n_ke_fmt(customer_address, 30), # 30 Symbols for the address
self._l10n_ke_fmt(postcode_and_city, 30), # 30 symbols for the customer post code and city
self._l10n_ke_fmt('', 30), # 30 symbols for the exemption number
if self.move_type == 'out_refund':
invoice_elements.append(self._l10n_ke_fmt(self.reversed_entry_id.l10n_ke_cu_invoice_number, 19)), # 19 symbols for related invoice number
invoice_elements.append(re.sub('[^A-Za-z0-9 ]+', '', self.name)[-15:].ljust(15).encode('cp1251')) # 15 symbols for trader system invoice number
# Command: Open fiscal record (0x30)
return [b'\x30' + b';'.join(invoice_elements)]
def _l10n_ke_cu_lines_messages(self):
""" Serialise the data of each line on the invoice
This function transforms the lines in order to handle the differences
between the KRA expected data and the lines in odoo.
If a discount line (as a negative line) has been added to the invoice
lines, find a suitable line/lines to distribute the discount accross
:returns: List of byte-strings representing each command <CMD> and the
<DATA> of the line, which will be sent to the fiscal device
in order to add a line to the opened invoice.
def is_discount_line(line):
return line.price_subtotal < 0.0
def is_candidate(discount_line, other_line):
""" If the of one line match those of the discount line, the discount can be distributed accross that line """
discount_taxes = discount_line.tax_ids.flatten_taxes_hierarchy()
other_line_taxes = other_line.tax_ids.flatten_taxes_hierarchy()
return set(discount_taxes.ids) == set(other_line_taxes.ids)
lines = self.invoice_line_ids.filtered(lambda l: l.display_type == 'product' and l.quantity and l.price_total)
# The device expects all monetary values in Kenyan Shillings
if self.currency_id == self.company_id.currency_id:
currency_rate = 1
# In the case of a refund, use the currency rate of the original invoice
elif self.move_type == 'out_refund' and self.reversed_entry_id:
currency_rate = abs(self.reversed_entry_id.amount_total_signed / self.reversed_entry_id.amount_total)
currency_rate = abs(self.amount_total_signed / self.amount_total)
discount_dict = {line.id: line.discount for line in lines if line.price_total > 0}
for line in lines:
if not is_discount_line(line):
# Search for non-discount lines
candidate_vals_list = [l for l in lines if not is_discount_line(l) and is_candidate(l, line)]
candidate_vals_list = sorted(candidate_vals_list, key=lambda x: x.price_unit * x.quantity, reverse=True)
line_to_discount = abs(line.price_unit * line.quantity)
for candidate in candidate_vals_list:
still_to_discount = abs(candidate.price_unit * candidate.quantity * (100.0 - discount_dict[candidate.id]) / 100.0)
if line_to_discount >= still_to_discount:
discount_dict[candidate.id] = 100.0
line_to_discount -= still_to_discount
rest_to_discount = abs((line_to_discount / (candidate.price_unit * candidate.quantity)) * 100.0)
discount_dict[candidate.id] += rest_to_discount
msgs = []
tax_details = self._prepare_invoice_aggregated_taxes()
for line in self.invoice_line_ids.filtered(lambda l: l.display_type == 'product' and l.quantity and l.price_total > 0 and not discount_dict.get(l.id) >= 100):
# Here we use the original discount of the line, since it the distributed discount has not been applied in the price_total
price_total = 0
percentage = 0
item_code = line.tax_ids[0].l10n_ke_item_code_id
for tax in tax_details['tax_details_per_record'][line]['tax_details']:
if tax.amount in (16, 8, 0): # This should only occur once
line_tax_details = tax_details['tax_details_per_record'][line]['tax_details'][tax]
price_total = abs(line_tax_details['base_amount_currency']) + abs(line_tax_details['tax_amount_currency'])
percentage = tax.amount
price = round(price_total / abs(line.quantity) * 100 / (100 - line.discount), 2) * currency_rate
price = ('%.5f' % price).rstrip('0').rstrip('.')
uom = line.product_uom_id and line.product_uom_id.name or ''
line_data = b';'.join([
self._l10n_ke_fmt(line.product_id.display_name or line.name, 36), # 36 symbols for the article's name
self._l10n_ke_fmt(item_code.tax_rate or 'A', 1), # 1 symbol for article's vat class ('A', 'B', 'C', 'D', or 'E')
price[:15].encode('cp1251'), # 1 to 15 symbols for article's price with up to 5 digits after decimal point
self._l10n_ke_fmt(uom, 3), # 3 symbols for unit of measure
(item_code.code or '').ljust(10).encode('cp1251'), # 10 symbols for KRA item code in the format xxxx.xx.xx (can be empty)
self._l10n_ke_fmt(item_code.description or '', 20), # 20 symbols for KRA item code description (can be empty)
str(percentage).encode('cp1251')[:5] # up to 5 symbols for vat rate
# 1 to 10 symbols for quantity
line_data += b'*' + str(abs(line.quantity)).encode('cp1251')[:10]
if discount_dict.get(line.id):
# 1 to 7 symbols for percentage of discount/addition
discount_sign = b'-' if discount_dict[line.id] > 0 else b'+'
discount = discount_sign + str(abs(discount_dict[line.id])).encode('cp1251')[:6]
line_data += b',' + discount + b'%'
# Command: Sale of article (0x31)
msgs += [b'\x31' + line_data]
return msgs
def _l10n_ke_get_cu_messages(self):
""" Composes a list of all the command and data parts of the messages
required for the fiscal device to open an invoice, add lines and
subsequently close it.
msgs = self._l10n_ke_cu_open_invoice_message()
msgs += self._l10n_ke_cu_lines_messages()
# Command: Close fiscal reciept (0x38)
msgs += [b'\x38']
# Command: Read date and time (0x68)
msgs += [b'\x68']
return msgs
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
def l10n_ke_action_cu_post(self):
""" Returns the client action descriptor dictionary for sending the
invoice(s) to the control unit (the fiscal device).
# If l10n_ke_edi_oscu is configured for the company, disable sending via TREMOL.
if self.company_id.l10n_ke_oscu_is_active:
raise UserError(
_('An OSCU has been initialized for this company. Please send the e-invoice via Send and Print -> Send to eTIMS instead.')
# Check the configuration of the invoice
errors = self._l10n_ke_validate_move()
if errors:
error_msg = ""
for move, error_list in errors:
error_list = '\n'.join(error_list)
error_msg += _("Invalid invoice configuration on %(invoice)s:\n%(error_list)s\n\n", invoice=move, error_list=error_list)
raise UserError(error_msg)
return {
'type': 'ir.actions.client',
'tag': 'l10n_ke_post_send',
'params': [
'move_id': move.id,
'messages': json.dumps([msg.decode('cp1251') for msg in move._l10n_ke_get_cu_messages()]),
'proxy_address': move.company_id.l10n_ke_cu_proxy_address,
'company_vat': move.company_id.vat,
'name': move.name,
} for move in self
def l10n_ke_cu_responses(self, responses):
""" Set the fields related to the fiscal device on the invoice.
This is intended to be utilized by an RPC call from the javascript
client action. The fields are prefixed with l10n_ke_cu_*, which refers
to the fact that they originate from the control unit.
for response in responses:
move = self.browse(int(response['move_id']))
replies = [msg for msg in response['replies']]
'l10n_ke_cu_serial_number': response['serial_number'],
'l10n_ke_cu_invoice_number': replies[-2].split(';')[0],
'l10n_ke_cu_qrcode': replies[-2].split(';')[1].strip(),
'l10n_ke_cu_datetime': datetime.strptime(replies[-1], '%d-%m-%Y %H:%M'),