1286 lines
65 KiB
Python
1286 lines
65 KiB
Python
|
# -*- coding: utf-8 -*-
|
|||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|||
|
|
|||
|
import logging
|
|||
|
|
|||
|
from ast import literal_eval
|
|||
|
from collections import defaultdict
|
|||
|
from psycopg2 import Error
|
|||
|
|
|||
|
from odoo import _, api, fields, models
|
|||
|
from odoo.exceptions import RedirectWarning, UserError, ValidationError
|
|||
|
from odoo.osv import expression
|
|||
|
from odoo.tools import check_barcode_encoding, groupby
|
|||
|
from odoo.tools.float_utils import float_compare, float_is_zero
|
|||
|
|
|||
|
_logger = logging.getLogger(__name__)
|
|||
|
|
|||
|
|
|||
|
class StockQuant(models.Model):
|
|||
|
_name = 'stock.quant'
|
|||
|
_description = 'Quants'
|
|||
|
_rec_name = 'product_id'
|
|||
|
|
|||
|
def _domain_location_id(self):
|
|||
|
if not self._is_inventory_mode():
|
|||
|
return
|
|||
|
return [('usage', 'in', ['internal', 'transit'])]
|
|||
|
|
|||
|
def _domain_lot_id(self):
|
|||
|
if not self._is_inventory_mode():
|
|||
|
return
|
|||
|
domain = [
|
|||
|
"'|'",
|
|||
|
"('company_id', '=', company_id)",
|
|||
|
"('company_id', '=', False)"
|
|||
|
]
|
|||
|
if self.env.context.get('active_model') == 'product.product':
|
|||
|
domain.insert(0, "('product_id', '=', %s)" % self.env.context.get('active_id'))
|
|||
|
elif self.env.context.get('active_model') == 'product.template':
|
|||
|
product_template = self.env['product.template'].browse(self.env.context.get('active_id'))
|
|||
|
if product_template.exists():
|
|||
|
domain.insert(0, "('product_id', 'in', %s)" % product_template.product_variant_ids.ids)
|
|||
|
else:
|
|||
|
domain.insert(0, "('product_id', '=', product_id)")
|
|||
|
return '[' + ', '.join(domain) + ']'
|
|||
|
|
|||
|
def _domain_product_id(self):
|
|||
|
if not self._is_inventory_mode():
|
|||
|
return
|
|||
|
domain = [('type', '=', 'product')]
|
|||
|
if self.env.context.get('product_tmpl_ids') or self.env.context.get('product_tmpl_id'):
|
|||
|
products = self.env.context.get('product_tmpl_ids', []) + [self.env.context.get('product_tmpl_id', 0)]
|
|||
|
domain = expression.AND([domain, [('product_tmpl_id', 'in', products)]])
|
|||
|
return domain
|
|||
|
|
|||
|
product_id = fields.Many2one(
|
|||
|
'product.product', 'Product',
|
|||
|
domain=lambda self: self._domain_product_id(),
|
|||
|
ondelete='restrict', required=True, index=True, check_company=True)
|
|||
|
product_tmpl_id = fields.Many2one(
|
|||
|
'product.template', string='Product Template',
|
|||
|
related='product_id.product_tmpl_id')
|
|||
|
product_uom_id = fields.Many2one(
|
|||
|
'uom.uom', 'Unit of Measure',
|
|||
|
readonly=True, related='product_id.uom_id')
|
|||
|
priority = fields.Selection(related='product_tmpl_id.priority')
|
|||
|
company_id = fields.Many2one(related='location_id.company_id', string='Company', store=True, readonly=True)
|
|||
|
location_id = fields.Many2one(
|
|||
|
'stock.location', 'Location',
|
|||
|
domain=lambda self: self._domain_location_id(),
|
|||
|
auto_join=True, ondelete='restrict', required=True, index=True, check_company=True)
|
|||
|
warehouse_id = fields.Many2one('stock.warehouse', related='location_id.warehouse_id')
|
|||
|
storage_category_id = fields.Many2one(related='location_id.storage_category_id', store=True)
|
|||
|
cyclic_inventory_frequency = fields.Integer(related='location_id.cyclic_inventory_frequency')
|
|||
|
lot_id = fields.Many2one(
|
|||
|
'stock.lot', 'Lot/Serial Number', index=True,
|
|||
|
ondelete='restrict', check_company=True,
|
|||
|
domain=lambda self: self._domain_lot_id())
|
|||
|
sn_duplicated = fields.Boolean(string="Duplicated Serial Number", compute='_compute_sn_duplicated', help="If the same SN is in another Quant")
|
|||
|
package_id = fields.Many2one(
|
|||
|
'stock.quant.package', 'Package',
|
|||
|
domain="[('location_id', '=', location_id)]",
|
|||
|
help='The package containing this quant', ondelete='restrict', check_company=True, index=True)
|
|||
|
owner_id = fields.Many2one(
|
|||
|
'res.partner', 'Owner',
|
|||
|
help='This is the owner of the quant', check_company=True)
|
|||
|
quantity = fields.Float(
|
|||
|
'Quantity',
|
|||
|
help='Quantity of products in this quant, in the default unit of measure of the product',
|
|||
|
readonly=True, digits='Product Unit of Measure')
|
|||
|
reserved_quantity = fields.Float(
|
|||
|
'Reserved Quantity',
|
|||
|
default=0.0,
|
|||
|
help='Quantity of reserved products in this quant, in the default unit of measure of the product',
|
|||
|
readonly=True, required=True, digits='Product Unit of Measure')
|
|||
|
available_quantity = fields.Float(
|
|||
|
'Available Quantity',
|
|||
|
help="On hand quantity which hasn't been reserved on a transfer, in the default unit of measure of the product",
|
|||
|
compute='_compute_available_quantity', digits='Product Unit of Measure')
|
|||
|
in_date = fields.Datetime('Incoming Date', readonly=True, required=True, default=fields.Datetime.now)
|
|||
|
tracking = fields.Selection(related='product_id.tracking', readonly=True)
|
|||
|
on_hand = fields.Boolean('On Hand', store=False, search='_search_on_hand')
|
|||
|
product_categ_id = fields.Many2one(related='product_tmpl_id.categ_id')
|
|||
|
|
|||
|
# Inventory Fields
|
|||
|
inventory_quantity = fields.Float(
|
|||
|
'Counted Quantity', digits='Product Unit of Measure',
|
|||
|
help="The product's counted quantity.")
|
|||
|
inventory_quantity_auto_apply = fields.Float(
|
|||
|
'Inventoried Quantity', digits='Product Unit of Measure',
|
|||
|
compute='_compute_inventory_quantity_auto_apply',
|
|||
|
inverse='_set_inventory_quantity', groups='stock.group_stock_manager'
|
|||
|
)
|
|||
|
inventory_diff_quantity = fields.Float(
|
|||
|
'Difference', compute='_compute_inventory_diff_quantity', store=True,
|
|||
|
help="Indicates the gap between the product's theoretical quantity and its counted quantity.",
|
|||
|
readonly=True, digits='Product Unit of Measure')
|
|||
|
inventory_date = fields.Date(
|
|||
|
'Scheduled Date', compute='_compute_inventory_date', store=True, readonly=False,
|
|||
|
help="Next date the On Hand Quantity should be counted.")
|
|||
|
last_count_date = fields.Date(compute='_compute_last_count_date', help='Last time the Quantity was Updated')
|
|||
|
inventory_quantity_set = fields.Boolean(store=True, compute='_compute_inventory_quantity_set', readonly=False, default=False)
|
|||
|
is_outdated = fields.Boolean('Quantity has been moved since last count', compute='_compute_is_outdated')
|
|||
|
user_id = fields.Many2one(
|
|||
|
'res.users', 'Assigned To', help="User assigned to do product count.",
|
|||
|
domain=lambda self: [('groups_id', 'in', self.env.ref('stock.group_stock_user').id)])
|
|||
|
|
|||
|
@api.depends('quantity', 'reserved_quantity')
|
|||
|
def _compute_available_quantity(self):
|
|||
|
for quant in self:
|
|||
|
quant.available_quantity = quant.quantity - quant.reserved_quantity
|
|||
|
|
|||
|
@api.depends('location_id')
|
|||
|
def _compute_inventory_date(self):
|
|||
|
quants = self.filtered(lambda q: not q.inventory_date and q.location_id.usage in ['internal', 'transit'])
|
|||
|
date_by_location = {loc: loc._get_next_inventory_date() for loc in quants.location_id}
|
|||
|
for quant in quants:
|
|||
|
quant.inventory_date = date_by_location[quant.location_id]
|
|||
|
|
|||
|
def _compute_last_count_date(self):
|
|||
|
""" We look at the stock move lines associated with every quant to get the last count date.
|
|||
|
"""
|
|||
|
self.last_count_date = False
|
|||
|
groups = self.env['stock.move.line']._read_group(
|
|||
|
[
|
|||
|
('state', '=', 'done'),
|
|||
|
('is_inventory', '=', True),
|
|||
|
('product_id', 'in', self.product_id.ids),
|
|||
|
'|',
|
|||
|
('lot_id', 'in', self.lot_id.ids),
|
|||
|
('lot_id', '=', False),
|
|||
|
'|',
|
|||
|
('owner_id', 'in', self.owner_id.ids),
|
|||
|
('owner_id', '=', False),
|
|||
|
'|',
|
|||
|
('location_id', 'in', self.location_id.ids),
|
|||
|
('location_dest_id', 'in', self.location_id.ids),
|
|||
|
'|',
|
|||
|
('package_id', '=', False),
|
|||
|
'|',
|
|||
|
('package_id', 'in', self.package_id.ids),
|
|||
|
('result_package_id', 'in', self.package_id.ids),
|
|||
|
],
|
|||
|
['date:max', 'product_id', 'lot_id', 'package_id', 'owner_id', 'result_package_id', 'location_id', 'location_dest_id'],
|
|||
|
['product_id', 'lot_id', 'package_id', 'owner_id', 'result_package_id', 'location_id', 'location_dest_id'],
|
|||
|
lazy=False)
|
|||
|
|
|||
|
def _update_dict(date_by_quant, key, value):
|
|||
|
current_date = date_by_quant.get(key)
|
|||
|
if not current_date or value > current_date:
|
|||
|
date_by_quant[key] = value
|
|||
|
|
|||
|
date_by_quant = {}
|
|||
|
for group in groups:
|
|||
|
move_line_date = group['date']
|
|||
|
location_id = group['location_id'][0]
|
|||
|
location_dest_id = group['location_dest_id'][0]
|
|||
|
package_id = group['package_id'] and group['package_id'][0]
|
|||
|
result_package_id = group['result_package_id'] and group['result_package_id'][0]
|
|||
|
lot_id = group['lot_id'] and group['lot_id'][0]
|
|||
|
owner_id = group['owner_id'] and group['owner_id'][0]
|
|||
|
product_id = group['product_id'][0]
|
|||
|
_update_dict(date_by_quant, (location_id, package_id, product_id, lot_id, owner_id), move_line_date)
|
|||
|
_update_dict(date_by_quant, (location_dest_id, package_id, product_id, lot_id, owner_id), move_line_date)
|
|||
|
_update_dict(date_by_quant, (location_id, result_package_id, product_id, lot_id, owner_id), move_line_date)
|
|||
|
_update_dict(date_by_quant, (location_dest_id, result_package_id, product_id, lot_id, owner_id), move_line_date)
|
|||
|
for quant in self:
|
|||
|
quant.last_count_date = date_by_quant.get((quant.location_id.id, quant.package_id.id, quant.product_id.id, quant.lot_id.id, quant.owner_id.id))
|
|||
|
|
|||
|
@api.depends('inventory_quantity')
|
|||
|
def _compute_inventory_diff_quantity(self):
|
|||
|
for quant in self:
|
|||
|
quant.inventory_diff_quantity = quant.inventory_quantity - quant.quantity
|
|||
|
|
|||
|
@api.depends('inventory_quantity')
|
|||
|
def _compute_inventory_quantity_set(self):
|
|||
|
self.inventory_quantity_set = True
|
|||
|
|
|||
|
@api.depends('inventory_quantity', 'quantity', 'product_id')
|
|||
|
def _compute_is_outdated(self):
|
|||
|
self.is_outdated = False
|
|||
|
for quant in self:
|
|||
|
if quant.product_id and float_compare(quant.inventory_quantity - quant.inventory_diff_quantity, quant.quantity, precision_rounding=quant.product_uom_id.rounding) and quant.inventory_quantity_set:
|
|||
|
quant.is_outdated = True
|
|||
|
|
|||
|
@api.depends('quantity')
|
|||
|
def _compute_inventory_quantity_auto_apply(self):
|
|||
|
for quant in self:
|
|||
|
quant.inventory_quantity_auto_apply = quant.quantity
|
|||
|
|
|||
|
@api.depends('lot_id')
|
|||
|
def _compute_sn_duplicated(self):
|
|||
|
self.sn_duplicated = False
|
|||
|
domain = [('tracking', '=', 'serial'), ('lot_id', 'in', self.lot_id.ids), ('location_id.usage', 'in', ['internal', 'transit'])]
|
|||
|
results = self._read_group(domain, ['lot_id'], ['lot_id'])
|
|||
|
duplicated_sn_ids = [x['lot_id'][0] for x in results if x['lot_id_count'] > 1]
|
|||
|
quants_with_duplicated_sn = self.env['stock.quant'].search([('lot_id', 'in', duplicated_sn_ids)])
|
|||
|
quants_with_duplicated_sn.sn_duplicated = True
|
|||
|
|
|||
|
def _set_inventory_quantity(self):
|
|||
|
""" Inverse method to create stock move when `inventory_quantity` is set
|
|||
|
(`inventory_quantity` is only accessible in inventory mode).
|
|||
|
"""
|
|||
|
if not self._is_inventory_mode():
|
|||
|
return
|
|||
|
for quant in self:
|
|||
|
quant.inventory_quantity = quant.inventory_quantity_auto_apply
|
|||
|
self.action_apply_inventory()
|
|||
|
|
|||
|
def _search_on_hand(self, operator, value):
|
|||
|
"""Handle the "on_hand" filter, indirectly calling `_get_domain_locations`."""
|
|||
|
if operator not in ['=', '!='] or not isinstance(value, bool):
|
|||
|
raise UserError(_('Operation not supported'))
|
|||
|
domain_loc = self.env['product.product']._get_domain_locations()[0]
|
|||
|
quant_query = self.env['stock.quant']._search(domain_loc)
|
|||
|
if (operator == '!=' and value is True) or (operator == '=' and value is False):
|
|||
|
domain_operator = 'not in'
|
|||
|
else:
|
|||
|
domain_operator = 'in'
|
|||
|
return [('id', domain_operator, quant_query)]
|
|||
|
|
|||
|
@api.model_create_multi
|
|||
|
def create(self, vals_list):
|
|||
|
""" Override to handle the "inventory mode" and create a quant as
|
|||
|
superuser the conditions are met.
|
|||
|
"""
|
|||
|
quants = self.env['stock.quant']
|
|||
|
is_inventory_mode = self._is_inventory_mode()
|
|||
|
allowed_fields = self._get_inventory_fields_create()
|
|||
|
for vals in vals_list:
|
|||
|
if is_inventory_mode and any(f in vals for f in ['inventory_quantity', 'inventory_quantity_auto_apply']):
|
|||
|
if any(field for field in vals.keys() if field not in allowed_fields):
|
|||
|
raise UserError(_("Quant's creation is restricted, you can't do this operation."))
|
|||
|
auto_apply = 'inventory_quantity_auto_apply' in vals
|
|||
|
inventory_quantity = vals.pop('inventory_quantity_auto_apply', False) or vals.pop(
|
|||
|
'inventory_quantity', False) or 0
|
|||
|
# Create an empty quant or write on a similar one.
|
|||
|
product = self.env['product.product'].browse(vals['product_id'])
|
|||
|
location = self.env['stock.location'].browse(vals['location_id'])
|
|||
|
lot_id = self.env['stock.lot'].browse(vals.get('lot_id'))
|
|||
|
package_id = self.env['stock.quant.package'].browse(vals.get('package_id'))
|
|||
|
owner_id = self.env['res.partner'].browse(vals.get('owner_id'))
|
|||
|
quant = self.env['stock.quant']
|
|||
|
if not self.env.context.get('import_file'):
|
|||
|
# Merge quants later, to make sure one line = one record during batch import
|
|||
|
quant = self._gather(product, location, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=True)
|
|||
|
if lot_id:
|
|||
|
quant = quant.filtered(lambda q: q.lot_id)
|
|||
|
if quant:
|
|||
|
quant = quant[0].sudo()
|
|||
|
else:
|
|||
|
quant = self.sudo().create(vals)
|
|||
|
if 'quants_cache' in self.env.context:
|
|||
|
self.env.context['quants_cache'][
|
|||
|
quant.product_id.id, quant.location_id.id, quant.lot_id.id, quant.package_id.id, quant.owner_id.id
|
|||
|
] |= quant
|
|||
|
if auto_apply:
|
|||
|
quant.write({'inventory_quantity_auto_apply': inventory_quantity})
|
|||
|
else:
|
|||
|
# Set the `inventory_quantity` field to create the necessary move.
|
|||
|
quant.inventory_quantity = inventory_quantity
|
|||
|
quant.user_id = vals.get('user_id', self.env.user.id)
|
|||
|
quant.inventory_date = fields.Date.today()
|
|||
|
quants |= quant
|
|||
|
else:
|
|||
|
quant = super().create(vals)
|
|||
|
if 'quants_cache' in self.env.context:
|
|||
|
self.env.context['quants_cache'][
|
|||
|
quant.product_id.id, quant.location_id.id, quant.lot_id.id, quant.package_id.id, quant.owner_id.id
|
|||
|
] |= quant
|
|||
|
quants |= quant
|
|||
|
if self._is_inventory_mode():
|
|||
|
quant._check_company()
|
|||
|
return quants
|
|||
|
|
|||
|
def _load_records_create(self, values):
|
|||
|
""" Add default location if import file did not fill it"""
|
|||
|
company_user = self.env.company
|
|||
|
warehouse = self.env['stock.warehouse'].search([('company_id', '=', company_user.id)], limit=1)
|
|||
|
for value in values:
|
|||
|
if 'location_id' not in value:
|
|||
|
value['location_id'] = warehouse.lot_stock_id.id
|
|||
|
return super(StockQuant, self.with_context(inventory_mode=True))._load_records_create(values)
|
|||
|
|
|||
|
def _load_records_write(self, values):
|
|||
|
""" Only allowed fields should be modified """
|
|||
|
return super(StockQuant, self.with_context(inventory_mode=True))._load_records_write(values)
|
|||
|
|
|||
|
@api.model
|
|||
|
def read_group(self, domain, fields, groupby, offset=0, limit=None, orderby=False, lazy=True):
|
|||
|
""" Override to set the `inventory_quantity` field if we're in "inventory mode" as well
|
|||
|
as to compute the sum of the `available_quantity` field.
|
|||
|
"""
|
|||
|
if 'available_quantity' in fields:
|
|||
|
if 'quantity' not in fields:
|
|||
|
fields.append('quantity')
|
|||
|
if 'reserved_quantity' not in fields:
|
|||
|
fields.append('reserved_quantity')
|
|||
|
if 'inventory_quantity_auto_apply' in fields and 'quantity' not in fields:
|
|||
|
fields.append('quantity')
|
|||
|
result = super(StockQuant, self).read_group(domain, fields, groupby, offset=offset, limit=limit, orderby=orderby, lazy=lazy)
|
|||
|
for group in result:
|
|||
|
if self.env.context.get('inventory_report_mode'):
|
|||
|
group['inventory_quantity'] = False
|
|||
|
if 'available_quantity' in fields:
|
|||
|
group['available_quantity'] = group['quantity'] - group['reserved_quantity']
|
|||
|
if 'inventory_quantity_auto_apply' in fields:
|
|||
|
group['inventory_quantity_auto_apply'] = group['quantity']
|
|||
|
return result
|
|||
|
|
|||
|
@api.model
|
|||
|
def get_import_templates(self):
|
|||
|
return [{
|
|||
|
'label': _('Import Template for Inventory Adjustments'),
|
|||
|
'template': '/stock/static/xlsx/stock_quant.xlsx'
|
|||
|
}]
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_forbidden_fields_write(self):
|
|||
|
""" Returns a list of fields user can't edit when he want to edit a quant in `inventory_mode`."""
|
|||
|
return ['product_id', 'location_id', 'lot_id', 'package_id', 'owner_id']
|
|||
|
|
|||
|
def write(self, vals):
|
|||
|
""" Override to handle the "inventory mode" and create the inventory move. """
|
|||
|
forbidden_fields = self._get_forbidden_fields_write()
|
|||
|
if self._is_inventory_mode() and any(field for field in forbidden_fields if field in vals.keys()):
|
|||
|
if any(quant.location_id.usage == 'inventory' for quant in self):
|
|||
|
# Do nothing when user tries to modify manually a inventory loss
|
|||
|
return
|
|||
|
self = self.sudo()
|
|||
|
raise UserError(_("Quant's editing is restricted, you can't do this operation."))
|
|||
|
return super(StockQuant, self).write(vals)
|
|||
|
|
|||
|
@api.ondelete(at_uninstall=False)
|
|||
|
def _unlink_except_wrong_permission(self):
|
|||
|
if not self.env.is_superuser():
|
|||
|
if not self.user_has_groups('stock.group_stock_manager'):
|
|||
|
raise UserError(_("Quants are auto-deleted when appropriate. If you must manually delete them, please ask a stock manager to do it."))
|
|||
|
self = self.with_context(inventory_mode=True)
|
|||
|
self.inventory_quantity = 0
|
|||
|
self._apply_inventory()
|
|||
|
|
|||
|
def action_view_stock_moves(self):
|
|||
|
self.ensure_one()
|
|||
|
action = self.env["ir.actions.actions"]._for_xml_id("stock.stock_move_line_action")
|
|||
|
action['domain'] = [
|
|||
|
'|',
|
|||
|
('location_id', '=', self.location_id.id),
|
|||
|
('location_dest_id', '=', self.location_id.id),
|
|||
|
('lot_id', '=', self.lot_id.id),
|
|||
|
]
|
|||
|
if self.package_id:
|
|||
|
action['domain'] += [
|
|||
|
'|',
|
|||
|
('package_id', '=', self.package_id.id),
|
|||
|
('result_package_id', '=', self.package_id.id),
|
|||
|
]
|
|||
|
action['context'] = literal_eval(action.get('context'))
|
|||
|
action['context']['search_default_product_id'] = self.product_id.id
|
|||
|
return action
|
|||
|
|
|||
|
def action_view_orderpoints(self):
|
|||
|
action = self.env['product.product'].action_view_orderpoints()
|
|||
|
action['domain'] = [('product_id', '=', self.product_id.id)]
|
|||
|
return action
|
|||
|
|
|||
|
@api.model
|
|||
|
def action_view_quants(self):
|
|||
|
self = self.with_context(search_default_internal_loc=1)
|
|||
|
self = self._set_view_context()
|
|||
|
return self._get_quants_action(extend=True)
|
|||
|
|
|||
|
@api.model
|
|||
|
def action_view_inventory(self):
|
|||
|
""" Similar to _get_quants_action except specific for inventory adjustments (i.e. inventory counts). """
|
|||
|
self = self._set_view_context()
|
|||
|
self._quant_tasks()
|
|||
|
|
|||
|
ctx = dict(self.env.context or {})
|
|||
|
ctx['no_at_date'] = True
|
|||
|
if self.user_has_groups('stock.group_stock_user') and not self.user_has_groups('stock.group_stock_manager'):
|
|||
|
ctx['search_default_my_count'] = True
|
|||
|
action = {
|
|||
|
'name': _('Inventory Adjustments'),
|
|||
|
'view_mode': 'list',
|
|||
|
'view_id': self.env.ref('stock.view_stock_quant_tree_inventory_editable').id,
|
|||
|
'res_model': 'stock.quant',
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'context': ctx,
|
|||
|
'domain': [('location_id.usage', 'in', ['internal', 'transit'])],
|
|||
|
'help': """
|
|||
|
<p class="o_view_nocontent_smiling_face">
|
|||
|
{}
|
|||
|
</p><p>
|
|||
|
{} <span class="fa fa-long-arrow-right"/> {}</p>
|
|||
|
""".format(_('Your stock is currently empty'),
|
|||
|
_('Press the CREATE button to define quantity for each product in your stock or import them from a spreadsheet throughout Favorites'),
|
|||
|
_('Import')),
|
|||
|
}
|
|||
|
return action
|
|||
|
|
|||
|
def action_apply_inventory(self):
|
|||
|
products_tracked_without_lot = []
|
|||
|
for quant in self:
|
|||
|
rounding = quant.product_uom_id.rounding
|
|||
|
if fields.Float.is_zero(quant.inventory_diff_quantity, precision_rounding=rounding)\
|
|||
|
and fields.Float.is_zero(quant.inventory_quantity, precision_rounding=rounding)\
|
|||
|
and fields.Float.is_zero(quant.quantity, precision_rounding=rounding):
|
|||
|
continue
|
|||
|
if quant.product_id.tracking in ['lot', 'serial'] and\
|
|||
|
not quant.lot_id and quant.inventory_quantity != quant.quantity and not quant.quantity:
|
|||
|
products_tracked_without_lot.append(quant.product_id.id)
|
|||
|
# for some reason if multi-record, env.context doesn't pass to wizards...
|
|||
|
ctx = dict(self.env.context or {})
|
|||
|
ctx['default_quant_ids'] = self.ids
|
|||
|
quants_outdated = self.filtered(lambda quant: quant.is_outdated)
|
|||
|
if quants_outdated:
|
|||
|
ctx['default_quant_to_fix_ids'] = quants_outdated.ids
|
|||
|
return {
|
|||
|
'name': _('Conflict in Inventory Adjustment'),
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'view_mode': 'form',
|
|||
|
'views': [(False, 'form')],
|
|||
|
'res_model': 'stock.inventory.conflict',
|
|||
|
'target': 'new',
|
|||
|
'context': ctx,
|
|||
|
}
|
|||
|
if products_tracked_without_lot:
|
|||
|
ctx['default_product_ids'] = products_tracked_without_lot
|
|||
|
return {
|
|||
|
'name': _('Tracked Products in Inventory Adjustment'),
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'view_mode': 'form',
|
|||
|
'views': [(False, 'form')],
|
|||
|
'res_model': 'stock.track.confirmation',
|
|||
|
'target': 'new',
|
|||
|
'context': ctx,
|
|||
|
}
|
|||
|
self._apply_inventory()
|
|||
|
self.inventory_quantity_set = False
|
|||
|
|
|||
|
def action_inventory_history(self):
|
|||
|
self.ensure_one()
|
|||
|
action = {
|
|||
|
'name': _('History'),
|
|||
|
'view_mode': 'list,form',
|
|||
|
'res_model': 'stock.move.line',
|
|||
|
'views': [(self.env.ref('stock.view_move_line_tree').id, 'list'), (False, 'form')],
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'context': {
|
|||
|
'search_default_inventory': 1,
|
|||
|
'search_default_done': 1,
|
|||
|
'search_default_product_id': self.product_id.id,
|
|||
|
},
|
|||
|
'domain': [
|
|||
|
('company_id', '=', self.company_id.id),
|
|||
|
'|',
|
|||
|
('location_id', '=', self.location_id.id),
|
|||
|
('location_dest_id', '=', self.location_id.id),
|
|||
|
],
|
|||
|
}
|
|||
|
if self.lot_id:
|
|||
|
action['context']['search_default_lot_id'] = self.lot_id.id
|
|||
|
if self.package_id:
|
|||
|
action['context']['search_default_package_id'] = self.package_id.id
|
|||
|
action['context']['search_default_result_package_id'] = self.package_id.id
|
|||
|
if self.owner_id:
|
|||
|
action['context']['search_default_owner_id'] = self.owner_id.id
|
|||
|
return action
|
|||
|
|
|||
|
def action_set_inventory_quantity(self):
|
|||
|
quants_already_set = self.filtered(lambda quant: quant.inventory_quantity_set)
|
|||
|
if quants_already_set:
|
|||
|
ctx = dict(self.env.context or {}, default_quant_ids=self.ids)
|
|||
|
view = self.env.ref('stock.inventory_warning_set_view', False)
|
|||
|
return {
|
|||
|
'name': _('Quantities Already Set'),
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'view_mode': 'form',
|
|||
|
'views': [(view.id, 'form')],
|
|||
|
'view_id': view.id,
|
|||
|
'res_model': 'stock.inventory.warning',
|
|||
|
'target': 'new',
|
|||
|
'context': ctx,
|
|||
|
}
|
|||
|
for quant in self:
|
|||
|
quant.inventory_quantity = quant.quantity
|
|||
|
self.user_id = self.env.user.id
|
|||
|
self.inventory_quantity_set = True
|
|||
|
|
|||
|
def action_reset(self):
|
|||
|
ctx = dict(self.env.context or {}, default_quant_ids=self.ids)
|
|||
|
view = self.env.ref('stock.inventory_warning_reset_view', False)
|
|||
|
return {
|
|||
|
'name': _('Quantities To Reset'),
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'view_mode': 'form',
|
|||
|
'views': [(view.id, 'form')],
|
|||
|
'view_id': view.id,
|
|||
|
'res_model': 'stock.inventory.warning',
|
|||
|
'target': 'new',
|
|||
|
'context': ctx,
|
|||
|
}
|
|||
|
|
|||
|
def action_set_inventory_quantity_to_zero(self):
|
|||
|
self.inventory_quantity = 0
|
|||
|
self.inventory_diff_quantity = 0
|
|||
|
self.inventory_quantity_set = False
|
|||
|
|
|||
|
def action_warning_duplicated_sn(self):
|
|||
|
return {
|
|||
|
'name': _('Warning Duplicated SN'),
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'res_model': 'stock.quant',
|
|||
|
'views': [(self.env.ref('stock.duplicated_sn_warning').id, 'form')],
|
|||
|
'target': 'new',
|
|||
|
}
|
|||
|
|
|||
|
@api.constrains('product_id')
|
|||
|
def check_product_id(self):
|
|||
|
if any(elem.product_id.type != 'product' for elem in self):
|
|||
|
raise ValidationError(_('Quants cannot be created for consumables or services.'))
|
|||
|
|
|||
|
@api.constrains('quantity')
|
|||
|
def check_quantity(self):
|
|||
|
sn_quants = self.filtered(lambda q: q.product_id.tracking == 'serial' and q.location_id.usage != 'inventory' and q.lot_id)
|
|||
|
if not sn_quants:
|
|||
|
return
|
|||
|
domain = expression.OR([
|
|||
|
[('product_id', '=', q.product_id.id), ('location_id', '=', q.location_id.id), ('lot_id', '=', q.lot_id.id)]
|
|||
|
for q in sn_quants
|
|||
|
])
|
|||
|
groups = self.read_group(
|
|||
|
domain,
|
|||
|
['quantity'],
|
|||
|
['product_id', 'location_id', 'lot_id'],
|
|||
|
orderby='id',
|
|||
|
lazy=False,
|
|||
|
)
|
|||
|
for group in groups:
|
|||
|
product = self.env['product.product'].browse(group['product_id'][0])
|
|||
|
lot = self.env['stock.lot'].browse(group['lot_id'][0])
|
|||
|
uom = product.uom_id
|
|||
|
if float_compare(abs(group['quantity']), 1, precision_rounding=uom.rounding) > 0:
|
|||
|
raise ValidationError(_('The serial number has already been assigned: \n Product: %s, Serial Number: %s') % (product.display_name, lot.name))
|
|||
|
|
|||
|
@api.constrains('location_id')
|
|||
|
def check_location_id(self):
|
|||
|
for quant in self:
|
|||
|
if quant.location_id.usage == 'view':
|
|||
|
raise ValidationError(_('You cannot take products from or deliver products to a location of type "view" (%s).') % quant.location_id.name)
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_removal_strategy(self, product_id, location_id):
|
|||
|
if product_id.categ_id.removal_strategy_id:
|
|||
|
return product_id.categ_id.removal_strategy_id.with_context(lang=None).method
|
|||
|
loc = location_id
|
|||
|
while loc:
|
|||
|
if loc.removal_strategy_id:
|
|||
|
return loc.removal_strategy_id.with_context(lang=None).method
|
|||
|
loc = loc.location_id
|
|||
|
return 'fifo'
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_removal_strategy_order(self, removal_strategy):
|
|||
|
if removal_strategy == 'fifo':
|
|||
|
return 'in_date ASC, id'
|
|||
|
elif removal_strategy == 'lifo':
|
|||
|
return 'in_date DESC, id DESC'
|
|||
|
elif removal_strategy == 'closest':
|
|||
|
return 'location_id ASC, id DESC'
|
|||
|
raise UserError(_('Removal strategy %s not implemented.') % (removal_strategy,))
|
|||
|
|
|||
|
def _get_gather_domain(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False):
|
|||
|
domain = [('product_id', '=', product_id.id)]
|
|||
|
if not strict:
|
|||
|
if lot_id:
|
|||
|
domain = expression.AND([['|', ('lot_id', '=', lot_id.id), ('lot_id', '=', False)], domain])
|
|||
|
if package_id:
|
|||
|
domain = expression.AND([[('package_id', '=', package_id.id)], domain])
|
|||
|
if owner_id:
|
|||
|
domain = expression.AND([[('owner_id', '=', owner_id.id)], domain])
|
|||
|
domain = expression.AND([[('location_id', 'child_of', location_id.id)], domain])
|
|||
|
else:
|
|||
|
domain = expression.AND([['|', ('lot_id', '=', lot_id.id), ('lot_id', '=', False)] if lot_id else [('lot_id', '=', False)], domain])
|
|||
|
domain = expression.AND([[('package_id', '=', package_id and package_id.id or False)], domain])
|
|||
|
domain = expression.AND([[('owner_id', '=', owner_id and owner_id.id or False)], domain])
|
|||
|
domain = expression.AND([[('location_id', '=', location_id.id)], domain])
|
|||
|
return domain
|
|||
|
|
|||
|
def _gather(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False):
|
|||
|
removal_strategy = self._get_removal_strategy(product_id, location_id)
|
|||
|
removal_strategy_order = self._get_removal_strategy_order(removal_strategy)
|
|||
|
domain = self._get_gather_domain(product_id, location_id, lot_id, package_id, owner_id, strict)
|
|||
|
quants_cache = self.env.context.get('quants_cache')
|
|||
|
if quants_cache is not None and strict:
|
|||
|
res = self.env['stock.quant']
|
|||
|
if lot_id:
|
|||
|
res |= quants_cache[
|
|||
|
product_id.id, location_id.id, lot_id.id,
|
|||
|
package_id and package_id.id or False,
|
|||
|
owner_id and owner_id.id or False]
|
|||
|
res |= quants_cache[
|
|||
|
product_id.id, location_id.id, False,
|
|||
|
package_id and package_id.id or False,
|
|||
|
owner_id and owner_id.id or False]
|
|||
|
else:
|
|||
|
res = self.search(domain, order=removal_strategy_order).sorted(lambda q: not q.lot_id)
|
|||
|
return res
|
|||
|
|
|||
|
def _get_quants_by_products_locations(self, product_ids, location_ids, extra_domain=False):
|
|||
|
res = defaultdict(lambda: self.env['stock.quant'])
|
|||
|
if product_ids and location_ids:
|
|||
|
domain = [
|
|||
|
('product_id', 'in', product_ids.ids),
|
|||
|
('location_id', 'child_of', location_ids.ids)
|
|||
|
]
|
|||
|
if extra_domain:
|
|||
|
domain = expression.AND([domain, extra_domain])
|
|||
|
needed_quants = self.env['stock.quant']._read_group(
|
|||
|
domain,
|
|||
|
['ids:array_agg(id)'],
|
|||
|
['product_id', 'location_id', 'lot_id', 'package_id', 'owner_id'],
|
|||
|
lazy=False)
|
|||
|
for group in needed_quants:
|
|||
|
res[(group['product_id'][0], group['location_id'][0],
|
|||
|
group['lot_id'] and group['lot_id'][0] if group['lot_id'] else False,
|
|||
|
group['package_id'][0] if group['package_id'] else False,
|
|||
|
group['owner_id'][0] if group['owner_id'] else False)
|
|||
|
] = self.env['stock.quant'].browse(group['ids'])
|
|||
|
return res
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_available_quantity(self, product_id, location_id, lot_id=None, package_id=None, owner_id=None, strict=False, allow_negative=False):
|
|||
|
""" Return the available quantity, i.e. the sum of `quantity` minus the sum of
|
|||
|
`reserved_quantity`, for the set of quants sharing the combination of `product_id,
|
|||
|
location_id` if `strict` is set to False or sharing the *exact same characteristics*
|
|||
|
otherwise.
|
|||
|
This method is called in the following usecases:
|
|||
|
- when a stock move checks its availability
|
|||
|
- when a stock move actually assign
|
|||
|
- when editing a move line, to check if the new value is forced or not
|
|||
|
- when validating a move line with some forced values and have to potentially unlink an
|
|||
|
equivalent move line in another picking
|
|||
|
In the two first usecases, `strict` should be set to `False`, as we don't know what exact
|
|||
|
quants we'll reserve, and the characteristics are meaningless in this context.
|
|||
|
In the last ones, `strict` should be set to `True`, as we work on a specific set of
|
|||
|
characteristics.
|
|||
|
|
|||
|
:return: available quantity as a float
|
|||
|
"""
|
|||
|
self = self.sudo()
|
|||
|
quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict)
|
|||
|
rounding = product_id.uom_id.rounding
|
|||
|
if product_id.tracking == 'none':
|
|||
|
available_quantity = sum(quants.mapped('quantity')) - sum(quants.mapped('reserved_quantity'))
|
|||
|
if allow_negative:
|
|||
|
return available_quantity
|
|||
|
else:
|
|||
|
return available_quantity if float_compare(available_quantity, 0.0, precision_rounding=rounding) >= 0.0 else 0.0
|
|||
|
else:
|
|||
|
availaible_quantities = {lot_id: 0.0 for lot_id in list(set(quants.mapped('lot_id'))) + ['untracked']}
|
|||
|
for quant in quants:
|
|||
|
if not quant.lot_id and strict and lot_id:
|
|||
|
continue
|
|||
|
if not quant.lot_id:
|
|||
|
availaible_quantities['untracked'] += quant.quantity - quant.reserved_quantity
|
|||
|
else:
|
|||
|
availaible_quantities[quant.lot_id] += quant.quantity - quant.reserved_quantity
|
|||
|
if allow_negative:
|
|||
|
return sum(availaible_quantities.values())
|
|||
|
else:
|
|||
|
return sum([available_quantity for available_quantity in availaible_quantities.values() if float_compare(available_quantity, 0, precision_rounding=rounding) > 0])
|
|||
|
|
|||
|
@api.onchange('location_id', 'product_id', 'lot_id', 'package_id', 'owner_id')
|
|||
|
def _onchange_location_or_product_id(self):
|
|||
|
vals = {}
|
|||
|
|
|||
|
# Once the new line is complete, fetch the new theoretical values.
|
|||
|
if self.product_id and self.location_id:
|
|||
|
# Sanity check if a lot has been set.
|
|||
|
if self.lot_id:
|
|||
|
if self.tracking == 'none' or self.product_id != self.lot_id.product_id:
|
|||
|
vals['lot_id'] = None
|
|||
|
|
|||
|
quant = self._gather(
|
|||
|
self.product_id, self.location_id, lot_id=self.lot_id,
|
|||
|
package_id=self.package_id, owner_id=self.owner_id, strict=True)
|
|||
|
if quant:
|
|||
|
self.quantity = quant.filtered(lambda q: q.lot_id == self.lot_id).quantity
|
|||
|
|
|||
|
# Special case: directly set the quantity to one for serial numbers,
|
|||
|
# it'll trigger `inventory_quantity` compute.
|
|||
|
if self.lot_id and self.tracking == 'serial':
|
|||
|
vals['inventory_quantity'] = 1
|
|||
|
vals['inventory_quantity_auto_apply'] = 1
|
|||
|
|
|||
|
if vals:
|
|||
|
self.update(vals)
|
|||
|
|
|||
|
@api.onchange('inventory_quantity')
|
|||
|
def _onchange_inventory_quantity(self):
|
|||
|
if self.location_id and self.location_id.usage == 'inventory':
|
|||
|
warning = {
|
|||
|
'title': _('You cannot modify inventory loss quantity'),
|
|||
|
'message': _(
|
|||
|
'Editing quantities in an Inventory Adjustment location is forbidden,'
|
|||
|
'those locations are used as counterpart when correcting the quantities.'
|
|||
|
)
|
|||
|
}
|
|||
|
return {'warning': warning}
|
|||
|
|
|||
|
@api.onchange('lot_id')
|
|||
|
def _onchange_serial_number(self):
|
|||
|
if self.lot_id and self.product_id.tracking == 'serial':
|
|||
|
message, dummy = self.env['stock.quant']._check_serial_number(self.product_id,
|
|||
|
self.lot_id,
|
|||
|
self.company_id)
|
|||
|
if message:
|
|||
|
return {'warning': {'title': _('Warning'), 'message': message}}
|
|||
|
|
|||
|
@api.onchange('product_id', 'company_id')
|
|||
|
def _onchange_product_id(self):
|
|||
|
if self.location_id:
|
|||
|
return
|
|||
|
if self.product_id.tracking in ['lot', 'serial']:
|
|||
|
previous_quants = self.env['stock.quant'].search([
|
|||
|
('product_id', '=', self.product_id.id),
|
|||
|
('location_id.usage', 'in', ['internal', 'transit'])], limit=1, order='create_date desc')
|
|||
|
if previous_quants:
|
|||
|
self.location_id = previous_quants.location_id
|
|||
|
if not self.location_id:
|
|||
|
company_id = self.company_id and self.company_id.id or self.env.company.id
|
|||
|
self.location_id = self.env['stock.warehouse'].search(
|
|||
|
[('company_id', '=', company_id)], limit=1).in_type_id.default_location_dest_id
|
|||
|
|
|||
|
def _apply_inventory(self):
|
|||
|
move_vals = []
|
|||
|
if not self.user_has_groups('stock.group_stock_manager'):
|
|||
|
raise UserError(_('Only a stock manager can validate an inventory adjustment.'))
|
|||
|
for quant in self:
|
|||
|
# Create and validate a move so that the quant matches its `inventory_quantity`.
|
|||
|
if float_compare(quant.inventory_diff_quantity, 0, precision_rounding=quant.product_uom_id.rounding) > 0:
|
|||
|
move_vals.append(
|
|||
|
quant._get_inventory_move_values(quant.inventory_diff_quantity,
|
|||
|
quant.product_id.with_company(quant.company_id).property_stock_inventory,
|
|||
|
quant.location_id))
|
|||
|
else:
|
|||
|
move_vals.append(
|
|||
|
quant._get_inventory_move_values(-quant.inventory_diff_quantity,
|
|||
|
quant.location_id,
|
|||
|
quant.product_id.with_company(quant.company_id).property_stock_inventory,
|
|||
|
out=True))
|
|||
|
moves = self.env['stock.move'].with_context(inventory_mode=False).create(move_vals)
|
|||
|
moves._action_done()
|
|||
|
self.location_id.write({'last_inventory_date': fields.Date.today()})
|
|||
|
date_by_location = {loc: loc._get_next_inventory_date() for loc in self.mapped('location_id')}
|
|||
|
for quant in self:
|
|||
|
quant.inventory_date = date_by_location[quant.location_id]
|
|||
|
self.write({'inventory_quantity': 0, 'user_id': False})
|
|||
|
self.write({'inventory_diff_quantity': 0})
|
|||
|
|
|||
|
@api.model
|
|||
|
def _update_available_quantity(self, product_id, location_id, quantity, lot_id=None, package_id=None, owner_id=None, in_date=None):
|
|||
|
""" Increase or decrease `reserved_quantity` of a set of quants for a given set of
|
|||
|
product_id/location_id/lot_id/package_id/owner_id.
|
|||
|
|
|||
|
:param product_id:
|
|||
|
:param location_id:
|
|||
|
:param quantity:
|
|||
|
:param lot_id:
|
|||
|
:param package_id:
|
|||
|
:param owner_id:
|
|||
|
:param datetime in_date: Should only be passed when calls to this method are done in
|
|||
|
order to move a quant. When creating a tracked quant, the
|
|||
|
current datetime will be used.
|
|||
|
:return: tuple (available_quantity, in_date as a datetime)
|
|||
|
"""
|
|||
|
self = self.sudo()
|
|||
|
quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=True)
|
|||
|
if lot_id and quantity > 0:
|
|||
|
quants = quants.filtered(lambda q: q.lot_id)
|
|||
|
|
|||
|
if location_id.should_bypass_reservation():
|
|||
|
incoming_dates = []
|
|||
|
else:
|
|||
|
incoming_dates = [quant.in_date for quant in quants if quant.in_date and
|
|||
|
float_compare(quant.quantity, 0, precision_rounding=quant.product_uom_id.rounding) > 0]
|
|||
|
if in_date:
|
|||
|
incoming_dates += [in_date]
|
|||
|
# If multiple incoming dates are available for a given lot_id/package_id/owner_id, we
|
|||
|
# consider only the oldest one as being relevant.
|
|||
|
if incoming_dates:
|
|||
|
in_date = min(incoming_dates)
|
|||
|
else:
|
|||
|
in_date = fields.Datetime.now()
|
|||
|
|
|||
|
quant = None
|
|||
|
if quants:
|
|||
|
# see _acquire_one_job for explanations
|
|||
|
self._cr.execute("SELECT id FROM stock_quant WHERE id IN %s ORDER BY lot_id LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED", [tuple(quants.ids)])
|
|||
|
stock_quant_result = self._cr.fetchone()
|
|||
|
if stock_quant_result:
|
|||
|
quant = self.browse(stock_quant_result[0])
|
|||
|
|
|||
|
if quant:
|
|||
|
quant.write({
|
|||
|
'quantity': quant.quantity + quantity,
|
|||
|
'in_date': in_date,
|
|||
|
})
|
|||
|
else:
|
|||
|
self.create({
|
|||
|
'product_id': product_id.id,
|
|||
|
'location_id': location_id.id,
|
|||
|
'quantity': quantity,
|
|||
|
'lot_id': lot_id and lot_id.id,
|
|||
|
'package_id': package_id and package_id.id,
|
|||
|
'owner_id': owner_id and owner_id.id,
|
|||
|
'in_date': in_date,
|
|||
|
})
|
|||
|
return self._get_available_quantity(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=True, allow_negative=True), in_date
|
|||
|
|
|||
|
def _raise_fix_unreserve_action(self, product_id):
|
|||
|
action = self.env.ref('stock.stock_quant_stock_move_line_desynchronization', raise_if_not_found=False)
|
|||
|
if action and self.user_has_groups('base.group_system'):
|
|||
|
msg = _(
|
|||
|
'It is not possible to reserve more products of %s than you have in stock.\n\n'
|
|||
|
'You can fix the discrepancies by clicking on the button below.\n'
|
|||
|
'The correction will remove the reservation of the impacted operations on all companies.\n'
|
|||
|
'If the error persists, or you see this message appear often, '
|
|||
|
'please submit a Support Ticket at https://www.odoo.com/help',
|
|||
|
product_id.display_name
|
|||
|
)
|
|||
|
raise RedirectWarning(msg, action.id, _('Fix discrepancies'))
|
|||
|
|
|||
|
@api.model
|
|||
|
def _update_reserved_quantity(self, product_id, location_id, quantity, lot_id=None, package_id=None, owner_id=None, strict=False):
|
|||
|
""" Increase the reserved quantity, i.e. increase `reserved_quantity` for the set of quants
|
|||
|
sharing the combination of `product_id, location_id` if `strict` is set to False or sharing
|
|||
|
the *exact same characteristics* otherwise. Typically, this method is called when reserving
|
|||
|
a move or updating a reserved move line. When reserving a chained move, the strict flag
|
|||
|
should be enabled (to reserve exactly what was brought). When the move is MTS,it could take
|
|||
|
anything from the stock, so we disable the flag. When editing a move line, we naturally
|
|||
|
enable the flag, to reflect the reservation according to the edition.
|
|||
|
|
|||
|
:return: a list of tuples (quant, quantity_reserved) showing on which quant the reservation
|
|||
|
was done and how much the system was able to reserve on it
|
|||
|
"""
|
|||
|
self = self.sudo()
|
|||
|
rounding = product_id.uom_id.rounding
|
|||
|
quants = self._gather(product_id, location_id, lot_id=lot_id, package_id=package_id, owner_id=owner_id, strict=strict)
|
|||
|
reserved_quants = []
|
|||
|
|
|||
|
if float_compare(quantity, 0, precision_rounding=rounding) > 0:
|
|||
|
# if we want to reserve
|
|||
|
available_quantity = sum(quants.filtered(lambda q: float_compare(q.quantity, 0, precision_rounding=rounding) > 0).mapped('quantity')) - sum(quants.mapped('reserved_quantity'))
|
|||
|
if float_compare(quantity, available_quantity, precision_rounding=rounding) > 0:
|
|||
|
raise UserError(_('It is not possible to reserve more products of %s than you have in stock.', product_id.display_name))
|
|||
|
elif float_compare(quantity, 0, precision_rounding=rounding) < 0:
|
|||
|
# if we want to unreserve
|
|||
|
available_quantity = sum(quants.mapped('reserved_quantity'))
|
|||
|
if float_compare(abs(quantity), available_quantity, precision_rounding=rounding) > 0:
|
|||
|
self._raise_fix_unreserve_action(product_id)
|
|||
|
raise UserError(_(
|
|||
|
'It is not possible to unreserve more products of %s than you have in stock.\n'
|
|||
|
'Please contact your system administrator to rectify this issue.',
|
|||
|
product_id.display_name
|
|||
|
))
|
|||
|
else:
|
|||
|
return reserved_quants
|
|||
|
|
|||
|
for quant in quants:
|
|||
|
if float_compare(quantity, 0, precision_rounding=rounding) > 0:
|
|||
|
max_quantity_on_quant = quant.quantity - quant.reserved_quantity
|
|||
|
if float_compare(max_quantity_on_quant, 0, precision_rounding=rounding) <= 0:
|
|||
|
continue
|
|||
|
max_quantity_on_quant = min(max_quantity_on_quant, quantity)
|
|||
|
quant.reserved_quantity += max_quantity_on_quant
|
|||
|
reserved_quants.append((quant, max_quantity_on_quant))
|
|||
|
quantity -= max_quantity_on_quant
|
|||
|
available_quantity -= max_quantity_on_quant
|
|||
|
else:
|
|||
|
max_quantity_on_quant = min(quant.reserved_quantity, abs(quantity))
|
|||
|
quant.reserved_quantity -= max_quantity_on_quant
|
|||
|
reserved_quants.append((quant, -max_quantity_on_quant))
|
|||
|
quantity += max_quantity_on_quant
|
|||
|
available_quantity += max_quantity_on_quant
|
|||
|
|
|||
|
if float_is_zero(quantity, precision_rounding=rounding) or float_is_zero(available_quantity, precision_rounding=rounding):
|
|||
|
break
|
|||
|
return reserved_quants
|
|||
|
|
|||
|
@api.model
|
|||
|
def _unlink_zero_quants(self):
|
|||
|
""" _update_available_quantity may leave quants with no
|
|||
|
quantity and no reserved_quantity. It used to directly unlink
|
|||
|
these zero quants but this proved to hurt the performance as
|
|||
|
this method is often called in batch and each unlink invalidate
|
|||
|
the cache. We defer the calls to unlink in this method.
|
|||
|
"""
|
|||
|
precision_digits = max(6, self.sudo().env.ref('product.decimal_product_uom').digits * 2)
|
|||
|
# Use a select instead of ORM search for UoM robustness.
|
|||
|
query = """SELECT id FROM stock_quant WHERE (round(quantity::numeric, %s) = 0 OR quantity IS NULL)
|
|||
|
AND round(reserved_quantity::numeric, %s) = 0
|
|||
|
AND (round(inventory_quantity::numeric, %s) = 0 OR inventory_quantity IS NULL)
|
|||
|
AND user_id IS NULL;"""
|
|||
|
params = (precision_digits, precision_digits, precision_digits)
|
|||
|
self.env.cr.execute(query, params)
|
|||
|
quant_ids = self.env['stock.quant'].browse([quant['id'] for quant in self.env.cr.dictfetchall()])
|
|||
|
quant_ids.sudo().unlink()
|
|||
|
|
|||
|
@api.model
|
|||
|
def _merge_quants(self):
|
|||
|
""" In a situation where one transaction is updating a quant via
|
|||
|
`_update_available_quantity` and another concurrent one calls this function with the same
|
|||
|
argument, we’ll create a new quant in order for these transactions to not rollback. This
|
|||
|
method will find and deduplicate these quants.
|
|||
|
"""
|
|||
|
params = []
|
|||
|
query = """WITH
|
|||
|
dupes AS (
|
|||
|
SELECT min(id) as to_update_quant_id,
|
|||
|
(array_agg(id ORDER BY id))[2:array_length(array_agg(id), 1)] as to_delete_quant_ids,
|
|||
|
SUM(reserved_quantity) as reserved_quantity,
|
|||
|
SUM(inventory_quantity) as inventory_quantity,
|
|||
|
SUM(quantity) as quantity,
|
|||
|
MIN(in_date) as in_date
|
|||
|
FROM stock_quant
|
|||
|
"""
|
|||
|
if self._ids:
|
|||
|
query += """
|
|||
|
WHERE
|
|||
|
location_id in %s
|
|||
|
AND product_id in %s
|
|||
|
"""
|
|||
|
params = [tuple(self.location_id.ids), tuple(self.product_id.ids)]
|
|||
|
query += """
|
|||
|
GROUP BY product_id, company_id, location_id, lot_id, package_id, owner_id
|
|||
|
HAVING count(id) > 1
|
|||
|
),
|
|||
|
_up AS (
|
|||
|
UPDATE stock_quant q
|
|||
|
SET quantity = d.quantity,
|
|||
|
reserved_quantity = d.reserved_quantity,
|
|||
|
inventory_quantity = d.inventory_quantity,
|
|||
|
in_date = d.in_date
|
|||
|
FROM dupes d
|
|||
|
WHERE d.to_update_quant_id = q.id
|
|||
|
)
|
|||
|
DELETE FROM stock_quant WHERE id in (SELECT unnest(to_delete_quant_ids) from dupes)
|
|||
|
"""
|
|||
|
try:
|
|||
|
with self.env.cr.savepoint():
|
|||
|
self.env.cr.execute(query, params)
|
|||
|
self.env.invalidate_all()
|
|||
|
except Error as e:
|
|||
|
_logger.info('an error occurred while merging quants: %s', e.pgerror)
|
|||
|
|
|||
|
@api.model
|
|||
|
def _quant_tasks(self):
|
|||
|
self._merge_quants()
|
|||
|
self._unlink_zero_quants()
|
|||
|
|
|||
|
@api.model
|
|||
|
def _is_inventory_mode(self):
|
|||
|
""" Used to control whether a quant was written on or created during an
|
|||
|
"inventory session", meaning a mode where we need to create the stock.move
|
|||
|
record necessary to be consistent with the `inventory_quantity` field.
|
|||
|
"""
|
|||
|
return self.env.context.get('inventory_mode') and self.user_has_groups('stock.group_stock_user')
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_inventory_fields_create(self):
|
|||
|
""" Returns a list of fields user can edit when he want to create a quant in `inventory_mode`.
|
|||
|
"""
|
|||
|
return ['product_id', 'location_id', 'lot_id', 'package_id', 'owner_id'] + self._get_inventory_fields_write()
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_inventory_fields_write(self):
|
|||
|
""" Returns a list of fields user can edit when he want to edit a quant in `inventory_mode`.
|
|||
|
"""
|
|||
|
fields = ['inventory_quantity', 'inventory_quantity_auto_apply', 'inventory_diff_quantity',
|
|||
|
'inventory_date', 'user_id', 'inventory_quantity_set', 'is_outdated', 'lot_id']
|
|||
|
return fields
|
|||
|
|
|||
|
def _get_inventory_move_values(self, qty, location_id, location_dest_id, out=False):
|
|||
|
""" Called when user manually set a new quantity (via `inventory_quantity`)
|
|||
|
just before creating the corresponding stock move.
|
|||
|
|
|||
|
:param location_id: `stock.location`
|
|||
|
:param location_dest_id: `stock.location`
|
|||
|
:param out: boolean to set on True when the move go to inventory adjustment location.
|
|||
|
:return: dict with all values needed to create a new `stock.move` with its move line.
|
|||
|
"""
|
|||
|
self.ensure_one()
|
|||
|
if fields.Float.is_zero(qty, 0, precision_rounding=self.product_uom_id.rounding):
|
|||
|
name = _('Product Quantity Confirmed')
|
|||
|
else:
|
|||
|
name = _('Product Quantity Updated')
|
|||
|
|
|||
|
return {
|
|||
|
'name': self.env.context.get('inventory_name') or name,
|
|||
|
'product_id': self.product_id.id,
|
|||
|
'product_uom': self.product_uom_id.id,
|
|||
|
'product_uom_qty': qty,
|
|||
|
'company_id': self.company_id.id or self.env.company.id,
|
|||
|
'state': 'confirmed',
|
|||
|
'location_id': location_id.id,
|
|||
|
'location_dest_id': location_dest_id.id,
|
|||
|
'restrict_partner_id': self.owner_id.id,
|
|||
|
'is_inventory': True,
|
|||
|
'move_line_ids': [(0, 0, {
|
|||
|
'product_id': self.product_id.id,
|
|||
|
'product_uom_id': self.product_uom_id.id,
|
|||
|
'qty_done': qty,
|
|||
|
'location_id': location_id.id,
|
|||
|
'location_dest_id': location_dest_id.id,
|
|||
|
'company_id': self.company_id.id or self.env.company.id,
|
|||
|
'lot_id': self.lot_id.id,
|
|||
|
'package_id': out and self.package_id.id or False,
|
|||
|
'result_package_id': (not out) and self.package_id.id or False,
|
|||
|
'owner_id': self.owner_id.id,
|
|||
|
})]
|
|||
|
}
|
|||
|
|
|||
|
def _set_view_context(self):
|
|||
|
""" Adds context when opening quants related views. """
|
|||
|
if not self.user_has_groups('stock.group_stock_multi_locations'):
|
|||
|
company_user = self.env.company
|
|||
|
warehouse = self.env['stock.warehouse'].search([('company_id', '=', company_user.id)], limit=1)
|
|||
|
if warehouse:
|
|||
|
self = self.with_context(default_location_id=warehouse.lot_stock_id.id, hide_location=not self.env.context.get('always_show_loc', False))
|
|||
|
|
|||
|
# If user have rights to write on quant, we set quants in inventory mode.
|
|||
|
if self.user_has_groups('stock.group_stock_user'):
|
|||
|
self = self.with_context(inventory_mode=True)
|
|||
|
return self
|
|||
|
|
|||
|
@api.model
|
|||
|
def _get_quants_action(self, domain=None, extend=False):
|
|||
|
""" Returns an action to open (non-inventory adjustment) quant view.
|
|||
|
Depending of the context (user have right to be inventory mode or not),
|
|||
|
the list view will be editable or readonly.
|
|||
|
|
|||
|
:param domain: List for the domain, empty by default.
|
|||
|
:param extend: If True, enables form, graph and pivot views. False by default.
|
|||
|
"""
|
|||
|
if not self.env['ir.config_parameter'].sudo().get_param('stock.skip_quant_tasks'):
|
|||
|
self._quant_tasks()
|
|||
|
ctx = dict(self.env.context or {})
|
|||
|
ctx['inventory_report_mode'] = True
|
|||
|
ctx.pop('group_by', None)
|
|||
|
action = {
|
|||
|
'name': _('Locations'),
|
|||
|
'view_type': 'tree',
|
|||
|
'view_mode': 'list,form',
|
|||
|
'res_model': 'stock.quant',
|
|||
|
'type': 'ir.actions.act_window',
|
|||
|
'context': ctx,
|
|||
|
'domain': domain or [],
|
|||
|
'help': """
|
|||
|
<p class="o_view_nocontent_empty_folder">{}</p>
|
|||
|
<p>{}</p>
|
|||
|
""".format(_('No Stock On Hand'),
|
|||
|
_('This analysis gives you an overview of the current stock level of your products.')),
|
|||
|
}
|
|||
|
|
|||
|
target_action = self.env.ref('stock.dashboard_open_quants', False)
|
|||
|
if target_action:
|
|||
|
action['id'] = target_action.id
|
|||
|
|
|||
|
form_view = self.env.ref('stock.view_stock_quant_form_editable').id
|
|||
|
if self.env.context.get('inventory_mode') and self.user_has_groups('stock.group_stock_manager'):
|
|||
|
action['view_id'] = self.env.ref('stock.view_stock_quant_tree_editable').id
|
|||
|
else:
|
|||
|
action['view_id'] = self.env.ref('stock.view_stock_quant_tree').id
|
|||
|
action.update({
|
|||
|
'views': [
|
|||
|
(action['view_id'], 'list'),
|
|||
|
(form_view, 'form'),
|
|||
|
],
|
|||
|
})
|
|||
|
if extend:
|
|||
|
action.update({
|
|||
|
'view_mode': 'tree,form,pivot,graph',
|
|||
|
'views': [
|
|||
|
(action['view_id'], 'list'),
|
|||
|
(form_view, 'form'),
|
|||
|
(self.env.ref('stock.view_stock_quant_pivot').id, 'pivot'),
|
|||
|
(self.env.ref('stock.stock_quant_view_graph').id, 'graph'),
|
|||
|
],
|
|||
|
})
|
|||
|
return action
|
|||
|
|
|||
|
@api.model
|
|||
|
def _check_serial_number(self, product_id, lot_id, company_id, source_location_id=None, ref_doc_location_id=None):
|
|||
|
""" Checks for duplicate serial numbers (SN) when assigning a SN (i.e. no source_location_id)
|
|||
|
and checks for potential incorrect location selection of a SN when using a SN (i.e.
|
|||
|
source_location_id). Returns warning message of all locations the SN is located at and
|
|||
|
(optionally) a recommended source location of the SN (when using SN from incorrect location).
|
|||
|
This function is designed to be used by onchange functions across differing situations including,
|
|||
|
but not limited to scrap, incoming picking SN encoding, and outgoing picking SN selection.
|
|||
|
|
|||
|
:param product_id: `product.product` product to check SN for
|
|||
|
:param lot_id: `stock.production.lot` SN to check
|
|||
|
:param company_id: `res.company` company to check against (i.e. we ignore duplicate SNs across
|
|||
|
different companies)
|
|||
|
:param source_location_id: `stock.location` optional source location if using the SN rather
|
|||
|
than assigning it
|
|||
|
:param ref_doc_location_id: `stock.location` optional reference document location for
|
|||
|
determining recommended location. This is param expected to only be used when a
|
|||
|
`source_location_id` is provided.
|
|||
|
:return: tuple(message, recommended_location) If not None, message is a string expected to be
|
|||
|
used in warning message dict and recommended_location is a `location_id`
|
|||
|
"""
|
|||
|
message = None
|
|||
|
recommended_location = None
|
|||
|
if product_id.tracking == 'serial':
|
|||
|
quants = self.env['stock.quant'].search([('product_id', '=', product_id.id),
|
|||
|
('lot_id', '=', lot_id.id),
|
|||
|
('quantity', '!=', 0),
|
|||
|
'|', ('location_id.usage', '=', 'customer'),
|
|||
|
'&', ('company_id', '=', company_id.id),
|
|||
|
('location_id.usage', 'in', ('internal', 'transit'))])
|
|||
|
sn_locations = quants.mapped('location_id')
|
|||
|
if quants:
|
|||
|
if not source_location_id:
|
|||
|
# trying to assign an already existing SN
|
|||
|
message = _('The Serial Number (%s) is already used in these location(s): %s.\n\n'
|
|||
|
'Is this expected? For example this can occur if a delivery operation is validated '
|
|||
|
'before its corresponding receipt operation is validated. In this case the issue will be solved '
|
|||
|
'automatically once all steps are completed. Otherwise, the serial number should be corrected to '
|
|||
|
'prevent inconsistent data.',
|
|||
|
lot_id.name, ', '.join(sn_locations.mapped('display_name')))
|
|||
|
|
|||
|
elif source_location_id and source_location_id not in sn_locations:
|
|||
|
# using an existing SN in the wrong location
|
|||
|
recommended_location = self.env['stock.location']
|
|||
|
if ref_doc_location_id:
|
|||
|
for location in sn_locations:
|
|||
|
if ref_doc_location_id.parent_path in location.parent_path:
|
|||
|
recommended_location = location
|
|||
|
break
|
|||
|
else:
|
|||
|
for location in sn_locations:
|
|||
|
if location.usage != 'customer':
|
|||
|
recommended_location = location
|
|||
|
break
|
|||
|
if recommended_location:
|
|||
|
message = _('Serial number (%s) is not located in %s, but is located in location(s): %s.\n\n'
|
|||
|
'Source location for this move will be changed to %s',
|
|||
|
lot_id.name, source_location_id.display_name, ', '.join(sn_locations.mapped('display_name')), recommended_location.display_name)
|
|||
|
else:
|
|||
|
message = _('Serial number (%s) is not located in %s, but is located in location(s): %s.\n\n'
|
|||
|
'Please correct this to prevent inconsistent data.',
|
|||
|
lot_id.name, source_location_id.display_name, ', '.join(sn_locations.mapped('display_name')))
|
|||
|
return message, recommended_location
|
|||
|
|
|||
|
|
|||
|
class QuantPackage(models.Model):
|
|||
|
""" Packages containing quants and/or other packages """
|
|||
|
_name = "stock.quant.package"
|
|||
|
_description = "Packages"
|
|||
|
_order = 'name'
|
|||
|
|
|||
|
name = fields.Char(
|
|||
|
'Package Reference', copy=False, index='trigram', required=True,
|
|||
|
default=lambda self: self.env['ir.sequence'].next_by_code('stock.quant.package') or _('Unknown Pack'))
|
|||
|
quant_ids = fields.One2many('stock.quant', 'package_id', 'Bulk Content', readonly=True,
|
|||
|
domain=['|', ('quantity', '!=', 0), ('reserved_quantity', '!=', 0)])
|
|||
|
package_type_id = fields.Many2one(
|
|||
|
'stock.package.type', 'Package Type', index=True)
|
|||
|
location_id = fields.Many2one(
|
|||
|
'stock.location', 'Location', compute='_compute_package_info',
|
|||
|
index=True, readonly=True, store=True)
|
|||
|
company_id = fields.Many2one(
|
|||
|
'res.company', 'Company', compute='_compute_package_info',
|
|||
|
index=True, readonly=True, store=True)
|
|||
|
owner_id = fields.Many2one(
|
|||
|
'res.partner', 'Owner', compute='_compute_package_info', search='_search_owner',
|
|||
|
index='btree_not_null', readonly=True, compute_sudo=True)
|
|||
|
package_use = fields.Selection([
|
|||
|
('disposable', 'Disposable Box'),
|
|||
|
('reusable', 'Reusable Box'),
|
|||
|
], string='Package Use', default='disposable', required=True,
|
|||
|
help="""Reusable boxes are used for batch picking and emptied afterwards to be reused. In the barcode application, scanning a reusable box will add the products in this box.
|
|||
|
Disposable boxes aren't reused, when scanning a disposable box in the barcode application, the contained products are added to the transfer.""")
|
|||
|
valid_sscc = fields.Boolean('Package name is valid SSCC', compute='_compute_valid_sscc')
|
|||
|
pack_date = fields.Date('Pack Date', default=fields.Date.today)
|
|||
|
|
|||
|
@api.depends('quant_ids.package_id', 'quant_ids.location_id', 'quant_ids.company_id', 'quant_ids.owner_id', 'quant_ids.quantity', 'quant_ids.reserved_quantity')
|
|||
|
def _compute_package_info(self):
|
|||
|
for package in self:
|
|||
|
values = {'location_id': False, 'owner_id': False}
|
|||
|
if package.quant_ids:
|
|||
|
values['location_id'] = package.quant_ids[0].location_id
|
|||
|
if all(q.owner_id == package.quant_ids[0].owner_id for q in package.quant_ids):
|
|||
|
values['owner_id'] = package.quant_ids[0].owner_id
|
|||
|
if all(q.company_id == package.quant_ids[0].company_id for q in package.quant_ids):
|
|||
|
values['company_id'] = package.quant_ids[0].company_id
|
|||
|
package.location_id = values['location_id']
|
|||
|
package.company_id = values.get('company_id')
|
|||
|
package.owner_id = values['owner_id']
|
|||
|
|
|||
|
@api.depends('name')
|
|||
|
def _compute_valid_sscc(self):
|
|||
|
self.valid_sscc = False
|
|||
|
for package in self:
|
|||
|
if package.name:
|
|||
|
package.valid_sscc = check_barcode_encoding(package.name, 'sscc')
|
|||
|
|
|||
|
def _search_owner(self, operator, value):
|
|||
|
if value:
|
|||
|
packs = self.search([('quant_ids.owner_id', operator, value)])
|
|||
|
else:
|
|||
|
packs = self.search([('quant_ids', operator, value)])
|
|||
|
if packs:
|
|||
|
return [('id', 'in', packs.ids)]
|
|||
|
else:
|
|||
|
return [('id', '=', False)]
|
|||
|
|
|||
|
def unpack(self):
|
|||
|
# remove inventory mode
|
|||
|
self = self.with_context(inventory_mode=False)
|
|||
|
unpacked_quants = self.env['stock.quant']
|
|||
|
for package in self:
|
|||
|
move_line_to_modify = self.env['stock.move.line'].search([
|
|||
|
('package_id', '=', package.id),
|
|||
|
('state', 'in', ('assigned', 'partially_available')),
|
|||
|
('reserved_qty', '!=', 0),
|
|||
|
])
|
|||
|
move_line_to_modify.write({'package_id': False})
|
|||
|
unpacked_quants |= package.quant_ids
|
|||
|
package.mapped('quant_ids').sudo().write({'package_id': False})
|
|||
|
|
|||
|
# Quant clean-up, mostly to avoid multiple quants of the same product. For example, unpack
|
|||
|
# 2 packages of 50, then reserve 100 => a quant of -50 is created at transfer validation.
|
|||
|
unpacked_quants._quant_tasks()
|
|||
|
|
|||
|
def action_view_picking(self):
|
|||
|
action = self.env["ir.actions.actions"]._for_xml_id("stock.action_picking_tree_all")
|
|||
|
domain = ['|', ('result_package_id', 'in', self.ids), ('package_id', 'in', self.ids)]
|
|||
|
pickings = self.env['stock.move.line'].search(domain).mapped('picking_id')
|
|||
|
action['domain'] = [('id', 'in', pickings.ids)]
|
|||
|
return action
|
|||
|
|
|||
|
def _check_move_lines_map_quant(self, move_lines, field):
|
|||
|
""" This method checks that all product (quants) of self (package) are well present in the `move_line_ids`. """
|
|||
|
precision_digits = self.env['decimal.precision'].precision_get('Product Unit of Measure')
|
|||
|
|
|||
|
def _keys_groupby(record):
|
|||
|
return record.product_id, record.lot_id
|
|||
|
|
|||
|
grouped_quants = {}
|
|||
|
for k, g in groupby(self.quant_ids, key=_keys_groupby):
|
|||
|
grouped_quants[k] = sum(self.env['stock.quant'].concat(*g).mapped('quantity'))
|
|||
|
|
|||
|
grouped_ops = {}
|
|||
|
for k, g in groupby(move_lines, key=_keys_groupby):
|
|||
|
grouped_ops[k] = sum(self.env['stock.move.line'].concat(*g).mapped(field))
|
|||
|
|
|||
|
if any(not float_is_zero(grouped_quants.get(key, 0) - grouped_ops.get(key, 0), precision_digits=precision_digits) for key in grouped_quants) \
|
|||
|
or any(not float_is_zero(grouped_ops.get(key, 0) - grouped_quants.get(key, 0), precision_digits=precision_digits) for key in grouped_ops):
|
|||
|
return False
|
|||
|
return True
|