# -*- coding: utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. import logging import json import re from datetime import datetime from odoo import models, fields, _, api from odoo.exceptions import UserError _logger = logging.getLogger(__name__) class AccountMove(models.Model): _inherit = 'account.move' l10n_ke_cu_datetime = fields.Datetime(string='CU Signing Date and Time', copy=False) l10n_ke_cu_serial_number = fields.Char(string='CU Serial Number', copy=False) l10n_ke_cu_invoice_number = fields.Char(string='CU Invoice Number', copy=False) l10n_ke_cu_qrcode = fields.Char(string='CU QR Code', copy=False) l10n_ke_cu_show_send_button = fields.Boolean(string='Show Send to Tremol button', compute='_compute_l10n_ke_cu_show_send_button') @api.depends('country_code', 'l10n_ke_cu_qrcode', 'state', 'move_type', 'company_id') def _compute_l10n_ke_cu_show_send_button(self): for move in self: move.l10n_ke_cu_show_send_button = ( move.country_code == 'KE' and not move.l10n_ke_cu_qrcode and move.state == 'posted' and move.move_type in ['out_invoice', 'out_refund'] and not move.company_id.l10n_ke_oscu_is_active ) # ------------------------------------------------------------------------- # HELPERS # ------------------------------------------------------------------------- def _l10n_ke_fmt(self, string, length, ljust=True): """ Function for common formatting behaviour :param string: string to be formatted/encoded :param length: integer length to justify (if enabled), and then truncate the string to :param ljust: boolean representing whether the string should be justified :returns: byte-string justified/truncated, with all non-alphanumeric characters removed """ if not string: string = '' return re.sub('[^A-Za-z0-9 ]+', '', str(string)).encode('cp1251').ljust(length if ljust else 0)[:length] # ------------------------------------------------------------------------- # CHECKS # ------------------------------------------------------------------------- def _l10n_ke_validate_move(self): """ Returns list of errors related to misconfigurations per move Find misconfigurations on the move, the lines of the move, and the taxes on those lines that would result in rejection by the KRA. """ errors = [] for move in self: move_errors = [] if move.country_code != 'KE': move_errors.append(_("This invoice is not a Kenyan invoice and therefore can not be sent to the device.")) if move.company_id.currency_id != self.env.ref('base.KES'): move_errors.append(_("This invoice's company currency is not in Kenyan Shillings, conversion to KES is not possible.")) if move.state != 'posted': move_errors.append(_("This invoice/credit note has not been posted. Please confirm it to continue.")) if move.move_type not in ('out_refund', 'out_invoice'): move_errors.append(_("The document being sent should be an invoice or credit note.")) if any([move.l10n_ke_cu_invoice_number, move.l10n_ke_cu_serial_number, move.l10n_ke_cu_qrcode, move.l10n_ke_cu_datetime]): move_errors.append(_("The document already has details related to the fiscal device. Please make sure that the invoice has not already been sent.")) # The credit note should refer to the control unit number (receipt number) of the original # invoice to which it relates. if move.move_type == 'out_refund' and not move.reversed_entry_id.l10n_ke_cu_invoice_number: move_errors.append(_("This credit note must reference the previous invoice, and this previous invoice must have already been submitted.")) for line in self.invoice_line_ids.filtered(lambda l: l.display_type == 'product'): vat_taxes = line.tax_ids.filtered(lambda tax: tax.amount in (16, 8, 0)) if not vat_taxes or len(vat_taxes) > 1: move_errors.append(_("On line %s, you must select one and only one VAT tax.", line.name)) else: if vat_taxes[0].amount == 0 and not line.tax_ids[0].l10n_ke_item_code_id: move_errors.append(_("On line %s, a tax with a KRA item code must be selected, since the tax is 0%% or exempt.", line.name)) if move_errors: errors.append((move.name, move_errors)) return errors def _l10n_ke_fiscal_device_details_filled(self): self.ensure_one() # If the company is configured for OSCU, don't block the Send & Print. if self.company_id.l10n_ke_oscu_is_active: return True return all([ self.country_code == 'KE', self.l10n_ke_cu_invoice_number, self.l10n_ke_cu_serial_number, self.l10n_ke_cu_qrcode, self.l10n_ke_cu_datetime, ]) # ------------------------------------------------------------------------- # SERIALISERS # ------------------------------------------------------------------------- def _l10n_ke_cu_open_invoice_message(self): """ Serialise the required fields for opening an invoice :returns: a list containing one byte-string representing the and of the message sent to the fiscal device. """ headquarter_address = (self.commercial_partner_id.street or '') + (self.commercial_partner_id.street2 or '') customer_address = (self.partner_id.street or '') + (self.partner_id.street2 or '') postcode_and_city = (self.partner_id.zip or '') + '' + (self.partner_id.city or '') vat = (self.commercial_partner_id.vat or '').strip() if self.commercial_partner_id.country_id.code == 'KE' else '' invoice_elements = [ b'1', # Reserved - 1 symbol with value '1' b' 0', # Reserved - 6 symbols with value ‘ 0’ b'0', # Reserved - 1 symbol with value '0' b'1' if self.move_type == 'out_invoice' else b'A', # 1 symbol with value '1' (new invoice), 'A' (credit note), or '@' (debit note) self._l10n_ke_fmt(self.commercial_partner_id.name, 30), # 30 symbols for Company name self._l10n_ke_fmt(vat, 14), # 14 Symbols for the client PIN number self._l10n_ke_fmt(headquarter_address, 30), # 30 Symbols for customer headquarters self._l10n_ke_fmt(customer_address, 30), # 30 Symbols for the address self._l10n_ke_fmt(postcode_and_city, 30), # 30 symbols for the customer post code and city self._l10n_ke_fmt('', 30), # 30 symbols for the exemption number ] if self.move_type == 'out_refund': invoice_elements.append(self._l10n_ke_fmt(self.reversed_entry_id.l10n_ke_cu_invoice_number, 19)), # 19 symbols for related invoice number invoice_elements.append(re.sub('[^A-Za-z0-9 ]+', '', self.name)[-15:].ljust(15).encode('cp1251')) # 15 symbols for trader system invoice number # Command: Open fiscal record (0x30) return [b'\x30' + b';'.join(invoice_elements)] def _l10n_ke_cu_lines_messages(self): """ Serialise the data of each line on the invoice This function transforms the lines in order to handle the differences between the KRA expected data and the lines in odoo. If a discount line (as a negative line) has been added to the invoice lines, find a suitable line/lines to distribute the discount accross :returns: List of byte-strings representing each command and the of the line, which will be sent to the fiscal device in order to add a line to the opened invoice. """ def is_discount_line(line): return line.price_subtotal < 0.0 def is_candidate(discount_line, other_line): """ If the of one line match those of the discount line, the discount can be distributed accross that line """ discount_taxes = discount_line.tax_ids.flatten_taxes_hierarchy() other_line_taxes = other_line.tax_ids.flatten_taxes_hierarchy() return set(discount_taxes.ids) == set(other_line_taxes.ids) lines = self.invoice_line_ids.filtered(lambda l: l.display_type == 'product' and l.quantity and l.price_total) # The device expects all monetary values in Kenyan Shillings if self.currency_id == self.company_id.currency_id: currency_rate = 1 # In the case of a refund, use the currency rate of the original invoice elif self.move_type == 'out_refund' and self.reversed_entry_id: currency_rate = abs(self.reversed_entry_id.amount_total_signed / self.reversed_entry_id.amount_total) else: currency_rate = abs(self.amount_total_signed / self.amount_total) discount_dict = {line.id: line.discount for line in lines if line.price_total > 0} for line in lines: if not is_discount_line(line): continue # Search for non-discount lines candidate_vals_list = [l for l in lines if not is_discount_line(l) and is_candidate(l, line)] candidate_vals_list = sorted(candidate_vals_list, key=lambda x: x.price_unit * x.quantity, reverse=True) line_to_discount = abs(line.price_unit * line.quantity) for candidate in candidate_vals_list: still_to_discount = abs(candidate.price_unit * candidate.quantity * (100.0 - discount_dict[candidate.id]) / 100.0) if line_to_discount >= still_to_discount: discount_dict[candidate.id] = 100.0 line_to_discount -= still_to_discount else: rest_to_discount = abs((line_to_discount / (candidate.price_unit * candidate.quantity)) * 100.0) discount_dict[candidate.id] += rest_to_discount break msgs = [] tax_details = self._prepare_invoice_aggregated_taxes() for line in self.invoice_line_ids.filtered(lambda l: l.display_type == 'product' and l.quantity and l.price_total > 0 and not discount_dict.get(l.id) >= 100): # Here we use the original discount of the line, since it the distributed discount has not been applied in the price_total price_total = 0 percentage = 0 item_code = line.tax_ids[0].l10n_ke_item_code_id for tax in tax_details['tax_details_per_record'][line]['tax_details']: if tax.amount in (16, 8, 0): # This should only occur once line_tax_details = tax_details['tax_details_per_record'][line]['tax_details'][tax] price_total = abs(line_tax_details['base_amount_currency']) + abs(line_tax_details['tax_amount_currency']) percentage = tax.amount price = round(price_total / abs(line.quantity) * 100 / (100 - line.discount), 2) * currency_rate price = ('%.5f' % price).rstrip('0').rstrip('.') uom = line.product_uom_id and line.product_uom_id.name or '' line_data = b';'.join([ self._l10n_ke_fmt(line.product_id.display_name or line.name, 36), # 36 symbols for the article's name self._l10n_ke_fmt(item_code.tax_rate or 'A', 1), # 1 symbol for article's vat class ('A', 'B', 'C', 'D', or 'E') price[:15].encode('cp1251'), # 1 to 15 symbols for article's price with up to 5 digits after decimal point self._l10n_ke_fmt(uom, 3), # 3 symbols for unit of measure (item_code.code or '').ljust(10).encode('cp1251'), # 10 symbols for KRA item code in the format xxxx.xx.xx (can be empty) self._l10n_ke_fmt(item_code.description or '', 20), # 20 symbols for KRA item code description (can be empty) str(percentage).encode('cp1251')[:5] # up to 5 symbols for vat rate ]) # 1 to 10 symbols for quantity line_data += b'*' + str(abs(line.quantity)).encode('cp1251')[:10] if discount_dict.get(line.id): # 1 to 7 symbols for percentage of discount/addition discount_sign = b'-' if discount_dict[line.id] > 0 else b'+' discount = discount_sign + str(abs(discount_dict[line.id])).encode('cp1251')[:6] line_data += b',' + discount + b'%' # Command: Sale of article (0x31) msgs += [b'\x31' + line_data] return msgs def _l10n_ke_get_cu_messages(self): """ Composes a list of all the command and data parts of the messages required for the fiscal device to open an invoice, add lines and subsequently close it. """ self.ensure_one() msgs = self._l10n_ke_cu_open_invoice_message() msgs += self._l10n_ke_cu_lines_messages() # Command: Close fiscal reciept (0x38) msgs += [b'\x38'] # Command: Read date and time (0x68) msgs += [b'\x68'] return msgs # ------------------------------------------------------------------------- # POST COMMANDS / RECEIVE DATA # ------------------------------------------------------------------------- def l10n_ke_action_cu_post(self): """ Returns the client action descriptor dictionary for sending the invoice(s) to the control unit (the fiscal device). """ # If l10n_ke_edi_oscu is configured for the company, disable sending via TREMOL. if self.company_id.l10n_ke_oscu_is_active: raise UserError( _('An OSCU has been initialized for this company. Please send the e-invoice via Send and Print -> Send to eTIMS instead.') ) # Check the configuration of the invoice errors = self._l10n_ke_validate_move() if errors: error_msg = "" for move, error_list in errors: error_list = '\n'.join(error_list) error_msg += _("Invalid invoice configuration on %(invoice)s:\n%(error_list)s\n\n", invoice=move, error_list=error_list) raise UserError(error_msg) return { 'type': 'ir.actions.client', 'tag': 'l10n_ke_post_send', 'params': [ { 'move_id': move.id, 'messages': json.dumps([msg.decode('cp1251') for msg in move._l10n_ke_get_cu_messages()]), 'proxy_address': move.company_id.l10n_ke_cu_proxy_address, 'company_vat': move.company_id.vat, 'name': move.name, } for move in self ] } def l10n_ke_cu_responses(self, responses): """ Set the fields related to the fiscal device on the invoice. This is intended to be utilized by an RPC call from the javascript client action. The fields are prefixed with l10n_ke_cu_*, which refers to the fact that they originate from the control unit. """ for response in responses: move = self.browse(int(response['move_id'])) replies = [msg for msg in response['replies']] move.update({ 'l10n_ke_cu_serial_number': response['serial_number'], 'l10n_ke_cu_invoice_number': replies[-2].split(';')[0], 'l10n_ke_cu_qrcode': replies[-2].split(';')[1].strip(), 'l10n_ke_cu_datetime': datetime.strptime(replies[-1], '%d-%m-%Y %H:%M'), })