Odoo18-Base/addons/delivery/models/stock_picking.py
2025-03-10 11:12:23 +07:00

395 lines
20 KiB
Python

# -*- coding: utf-8 -*-
# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
from collections import defaultdict
from datetime import date
from markupsafe import Markup
from odoo import models, fields, api, _, SUPERUSER_ID
from odoo.exceptions import UserError
from odoo.tools.sql import column_exists, create_column
class StockQuantPackage(models.Model):
_inherit = "stock.quant.package"
@api.depends('quant_ids', 'package_type_id')
def _compute_weight(self):
if self.env.context.get('picking_id'):
package_weights = defaultdict(float)
# Ordering by qty_done prevents the default ordering by groupby fields that can inject multiple Left Joins in the resulting query.
res_groups = self.env['stock.move.line'].read_group(
[('result_package_id', 'in', self.ids), ('product_id', '!=', False), ('picking_id', '=', self.env.context['picking_id'])],
['id:count'],
['result_package_id', 'product_id', 'product_uom_id', 'qty_done'],
lazy=False, orderby='qty_done asc'
)
for res_group in res_groups:
product_id = self.env['product.product'].browse(res_group['product_id'][0])
product_uom_id = self.env['uom.uom'].browse(res_group['product_uom_id'][0])
package_weights[res_group['result_package_id'][0]] += (
res_group['__count']
* product_uom_id._compute_quantity(res_group['qty_done'], product_id.uom_id)
* product_id.weight
)
for package in self:
weight = package.package_type_id.base_weight or 0.0
if self.env.context.get('picking_id'):
package.weight = weight + package_weights[package.id]
else:
for quant in package.quant_ids:
weight += quant.quantity * quant.product_id.weight
package.weight = weight
def _get_default_weight_uom(self):
return self.env['product.template']._get_weight_uom_name_from_ir_config_parameter()
def _compute_weight_uom_name(self):
for package in self:
package.weight_uom_name = self.env['product.template']._get_weight_uom_name_from_ir_config_parameter()
def _compute_weight_is_kg(self):
self.weight_is_kg = False
uom_id = self.env['product.template']._get_weight_uom_id_from_ir_config_parameter()
if uom_id == self.env.ref('uom.product_uom_kgm'):
self.weight_is_kg = True
self.weight_uom_rounding = uom_id.rounding
weight = fields.Float(compute='_compute_weight', digits='Stock Weight', help="Total weight of all the products contained in the package.")
weight_uom_name = fields.Char(string='Weight unit of measure label', compute='_compute_weight_uom_name', readonly=True, default=_get_default_weight_uom)
weight_is_kg = fields.Boolean("Technical field indicating whether weight uom is kg or not (i.e. lb)", compute="_compute_weight_is_kg")
weight_uom_rounding = fields.Float("Technical field indicating weight's number of decimal places", compute="_compute_weight_is_kg")
shipping_weight = fields.Float(string='Shipping Weight', help="Total weight of the package.")
class StockPicking(models.Model):
_inherit = 'stock.picking'
def _auto_init(self):
if not column_exists(self.env.cr, "stock_picking", "weight"):
# In order to speed up module installation when dealing with hefty data
# We create the column weight manually, but the computation will be skipped
# Therefore we do the computation in a query by getting weight sum from stock moves
create_column(self.env.cr, "stock_picking", "weight", "numeric")
self.env.cr.execute("""
WITH computed_weight AS (
SELECT SUM(weight) AS weight_sum, picking_id
FROM stock_move
WHERE picking_id IS NOT NULL
GROUP BY picking_id
)
UPDATE stock_picking
SET weight = weight_sum
FROM computed_weight
WHERE stock_picking.id = computed_weight.picking_id;
""")
return super()._auto_init()
@api.depends('move_line_ids', 'move_line_ids.result_package_id')
def _compute_packages(self):
packages = {
res["picking_id"][0]: set(res["result_package_id"])
for res in self.env["stock.move.line"].read_group(
[("picking_id", "in", self.ids), ("result_package_id", "!=", False)],
["result_package_id:array_agg"],
["picking_id"],
lazy=False, orderby="picking_id asc",
)
}
for picking in self:
picking.package_ids = list(packages.get(picking.id, []))
@api.depends('move_line_ids', 'move_line_ids.result_package_id', 'move_line_ids.product_uom_id', 'move_line_ids.qty_done')
def _compute_bulk_weight(self):
picking_weights = defaultdict(float)
# Ordering by qty_done prevents the default ordering by groupby fields that can inject multiple Left Joins in the resulting query.
res_groups = self.env['stock.move.line'].read_group(
[('picking_id', 'in', self.ids), ('product_id', '!=', False), ('result_package_id', '=', False)],
['id:count'],
['picking_id', 'product_id', 'product_uom_id', 'qty_done'],
lazy=False, orderby='qty_done asc'
)
products_by_id = {
product_res['id']: (product_res['uom_id'][0], product_res['weight'])
for product_res in
self.env['product.product'].with_context(active_test=False).search_read(
[('id', 'in', list(set(grp["product_id"][0] for grp in res_groups)))], ['uom_id', 'weight'])
}
for res_group in res_groups:
uom_id, weight = products_by_id[res_group['product_id'][0]]
uom = self.env['uom.uom'].browse(uom_id)
product_uom_id = self.env['uom.uom'].browse(res_group['product_uom_id'][0])
picking_weights[res_group['picking_id'][0]] += (
res_group['__count']
* product_uom_id._compute_quantity(res_group['qty_done'], uom)
* weight
)
for picking in self:
picking.weight_bulk = picking_weights[picking.id]
@api.depends('move_line_ids.result_package_id', 'move_line_ids.result_package_id.shipping_weight', 'weight_bulk')
def _compute_shipping_weight(self):
for picking in self:
# if shipping weight is not assigned => default to calculated product weight
picking.shipping_weight = (
picking.weight_bulk +
sum(pack.shipping_weight or pack.weight for pack in picking.package_ids.sudo())
)
def _get_default_weight_uom(self):
return self.env['product.template']._get_weight_uom_name_from_ir_config_parameter()
def _compute_weight_uom_name(self):
for package in self:
package.weight_uom_name = self.env['product.template']._get_weight_uom_name_from_ir_config_parameter()
carrier_price = fields.Float(string="Shipping Cost")
delivery_type = fields.Selection(related='carrier_id.delivery_type', readonly=True)
carrier_id = fields.Many2one("delivery.carrier", string="Carrier", check_company=True)
weight = fields.Float(compute='_cal_weight', digits='Stock Weight', store=True, help="Total weight of the products in the picking.", compute_sudo=True)
carrier_tracking_ref = fields.Char(string='Tracking Reference', copy=False)
carrier_tracking_url = fields.Char(string='Tracking URL', compute='_compute_carrier_tracking_url')
weight_uom_name = fields.Char(string='Weight unit of measure label', compute='_compute_weight_uom_name', readonly=True, default=_get_default_weight_uom)
package_ids = fields.Many2many('stock.quant.package', compute='_compute_packages', string='Packages')
weight_bulk = fields.Float('Bulk Weight', compute='_compute_bulk_weight', help="Total weight of products which are not in a package.")
shipping_weight = fields.Float("Weight for Shipping", compute='_compute_shipping_weight',
help="Total weight of packages and products not in a package. Packages with no shipping weight specified will default to their products' total weight. This is the weight used to compute the cost of the shipping.")
is_return_picking = fields.Boolean(compute='_compute_return_picking')
return_label_ids = fields.One2many('ir.attachment', compute='_compute_return_label')
destination_country_code = fields.Char(related='partner_id.country_id.code', string="Destination Country")
@api.depends('carrier_id', 'carrier_tracking_ref')
def _compute_carrier_tracking_url(self):
for picking in self:
picking.carrier_tracking_url = picking.carrier_id.get_tracking_link(picking) if picking.carrier_id and picking.carrier_tracking_ref else False
@api.depends('carrier_id', 'move_ids_without_package')
def _compute_return_picking(self):
for picking in self:
if picking.carrier_id and picking.carrier_id.can_generate_return:
picking.is_return_picking = any(m.origin_returned_move_id for m in picking.move_ids_without_package)
else:
picking.is_return_picking = False
def _compute_return_label(self):
for picking in self:
if picking.carrier_id:
picking.return_label_ids = self.env['ir.attachment'].search([('res_model', '=', 'stock.picking'), ('res_id', '=', picking.id), ('name', 'like', '%s%%' % picking.carrier_id.get_return_label_prefix())])
else:
picking.return_label_ids = False
def get_multiple_carrier_tracking(self):
self.ensure_one()
try:
return json.loads(self.carrier_tracking_url)
except (ValueError, TypeError):
return False
@api.depends('move_ids.weight')
def _cal_weight(self):
for picking in self:
picking.weight = sum(move.weight for move in picking.move_ids if move.state != 'cancel')
def _carrier_exception_note(self, exception):
self.ensure_one()
line_1 = _("Exception occurred with respect to carrier on the transfer")
line_2 = _("Manual actions might be needed.")
line_3 = _("Exception:")
return Markup('<div> {line_1} <a href="#" data-oe-model="stock.picking" data-oe-id="{picking_id}"> {picking_name}</a>. {line_2}<div class="mt16"><p>{line_3} {exception}</p></div></div>').format(line_1=line_1, line_2=line_2, line_3=line_3, picking_id=self.id, picking_name=self.name, exception=exception)
def _send_confirmation_email(self):
# The carrier's API processes validity checks and parcels generation one picking at a time.
# However, since a UserError of any of the picking will cause a rollback of the entire batch
# on Odoo's side and since pickings that were already processed on the carrier's side must
# stay validated, UserErrors might need to be replaced by activity warnings.
processed_carrier_picking = False
for pick in self:
try:
if pick.carrier_id and pick.carrier_id.integration_level == 'rate_and_ship' and pick.picking_type_code != 'incoming' and not pick.carrier_tracking_ref and pick.picking_type_id.print_label:
pick.sudo().send_to_shipper()
pick._check_carrier_details_compliance()
if pick.carrier_id:
processed_carrier_picking = True
except (UserError) as e:
if processed_carrier_picking:
# We can not raise a UserError at this point
exception_message = str(e)
pick.message_post(body=exception_message, message_type='notification')
pick.sudo().activity_schedule(
'mail.mail_activity_data_warning',
date.today(),
note=pick._carrier_exception_note(exception_message),
user_id=pick.user_id.id or self.env.user.id or SUPERUSER_ID,
)
else:
raise e
return super(StockPicking, self)._send_confirmation_email()
def _pre_put_in_pack_hook(self, move_line_ids):
res = super(StockPicking, self)._pre_put_in_pack_hook(move_line_ids)
if not res:
if move_line_ids.carrier_id:
if len(move_line_ids.carrier_id) > 1 or any(not ml.carrier_id for ml in move_line_ids):
# avoid (duplicate) costs for products
raise UserError(_("You cannot pack products into the same package when they have different carriers (i.e. check that all of their transfers have a carrier assigned and are using the same carrier)."))
return self._set_delivery_package_type(batch_pack=len(move_line_ids.picking_id) > 1)
else:
return res
def _set_delivery_package_type(self, batch_pack=False):
""" This method returns an action allowing to set the package type and the shipping weight
on the stock.quant.package.
"""
self.ensure_one()
view_id = self.env.ref('delivery.choose_delivery_package_view_form').id
context = dict(
self.env.context,
current_package_carrier_type=self.carrier_id.delivery_type,
default_picking_id=self.id,
batch_pack=batch_pack,
)
# As we pass the `delivery_type` ('fixed' or 'base_on_rule' by default) in a key who
# correspond to the `package_carrier_type` ('none' to default), we make a conversion.
# No need conversion for other carriers as the `delivery_type` and
#`package_carrier_type` will be the same in these cases.
if context['current_package_carrier_type'] in ['fixed', 'base_on_rule']:
context['current_package_carrier_type'] = 'none'
return {
'name': _('Package Details'),
'type': 'ir.actions.act_window',
'view_mode': 'form',
'res_model': 'choose.delivery.package',
'view_id': view_id,
'views': [(view_id, 'form')],
'target': 'new',
'context': context,
}
def send_to_shipper(self):
self.ensure_one()
res = self.carrier_id.send_shipping(self)[0]
if self.carrier_id.free_over and self.sale_id:
amount_without_delivery = self.sale_id._compute_amount_total_without_delivery()
if self.carrier_id._compute_currency(self.sale_id, amount_without_delivery, 'pricelist_to_company') >= self.carrier_id.amount:
res['exact_price'] = 0.0
self.carrier_price = res['exact_price'] * (1.0 + (self.carrier_id.margin / 100.0))
if res['tracking_number']:
related_pickings = self.env['stock.picking'] if self.carrier_tracking_ref and res['tracking_number'] in self.carrier_tracking_ref else self
accessed_moves = previous_moves = self.move_ids.move_orig_ids
while previous_moves:
related_pickings |= previous_moves.picking_id
previous_moves = previous_moves.move_orig_ids - accessed_moves
accessed_moves |= previous_moves
accessed_moves = next_moves = self.move_ids.move_dest_ids
while next_moves:
related_pickings |= next_moves.picking_id
next_moves = next_moves.move_dest_ids - accessed_moves
accessed_moves |= next_moves
without_tracking = related_pickings.filtered(lambda p: not p.carrier_tracking_ref)
without_tracking.carrier_tracking_ref = res['tracking_number']
for p in related_pickings - without_tracking:
p.carrier_tracking_ref += "," + res['tracking_number']
order_currency = self.sale_id.currency_id or self.company_id.currency_id
msg = _(
"Shipment sent to carrier %(carrier_name)s for shipping with tracking number %(ref)s<br/>Cost: %(price).2f %(currency)s",
carrier_name=self.carrier_id.name,
ref=self.carrier_tracking_ref,
price=self.carrier_price,
currency=order_currency.name
)
self.message_post(body=msg)
self._add_delivery_cost_to_so()
def _check_carrier_details_compliance(self):
"""Hook to check if a delivery is compliant in regard of the carrier.
"""
return
def print_return_label(self):
self.ensure_one()
self.carrier_id.get_return_label(self)
def _get_matching_delivery_lines(self):
return self.sale_id.order_line.filtered(
lambda l: l.is_delivery
and l.currency_id.is_zero(l.price_unit)
and l.product_id == self.carrier_id.product_id
)
def _prepare_sale_delivery_line_vals(self):
return {
'price_unit': self.carrier_price,
# remove the estimated price from the description
'name': self.carrier_id.with_context(lang=self.partner_id.lang).name,
}
def _add_delivery_cost_to_so(self):
self.ensure_one()
sale_order = self.sale_id
if sale_order and self.carrier_id.invoice_policy == 'real' and self.carrier_price:
delivery_lines = self._get_matching_delivery_lines()
if not delivery_lines:
delivery_lines = sale_order._create_delivery_line(self.carrier_id, self.carrier_price)
vals = self._prepare_sale_delivery_line_vals()
delivery_lines[0].write(vals)
def open_website_url(self):
self.ensure_one()
if not self.carrier_tracking_url:
raise UserError(_("Your delivery method has no redirect on courier provider's website to track this order."))
carrier_trackers = []
try:
carrier_trackers = json.loads(self.carrier_tracking_url)
except ValueError:
carrier_trackers = self.carrier_tracking_url
else:
msg = "Tracking links for shipment: <br/>"
for tracker in carrier_trackers:
msg += '<a href=' + tracker[1] + '>' + tracker[0] + '</a><br/>'
self.message_post(body=msg)
return self.env["ir.actions.actions"]._for_xml_id("delivery.act_delivery_trackers_url")
client_action = {
'type': 'ir.actions.act_url',
'name': "Shipment Tracking Page",
'target': 'new',
'url': self.carrier_tracking_url,
}
return client_action
def cancel_shipment(self):
for picking in self:
picking.carrier_id.cancel_shipment(self)
msg = "Shipment %s cancelled" % picking.carrier_tracking_ref
picking.message_post(body=msg)
picking.carrier_tracking_ref = False
def _get_estimated_weight(self):
self.ensure_one()
weight = 0.0
for move in self.move_ids:
weight += move.product_qty * move.product_id.weight
return weight
def _should_generate_commercial_invoice(self):
self.ensure_one()
return self.picking_type_id.warehouse_id.partner_id.country_id != self.partner_id.country_id
class StockReturnPicking(models.TransientModel):
_inherit = 'stock.return.picking'
def _create_returns(self):
# Prevent copy of the carrier and carrier price when generating return picking
# (we have no integration of returns for now)
new_picking, pick_type_id = super(StockReturnPicking, self)._create_returns()
picking = self.env['stock.picking'].browse(new_picking)
picking.write({'carrier_id': False,
'carrier_price': 0.0})
return new_picking, pick_type_id