Odoo18-Base/addons/mail/controllers/webclient.py
2025-01-06 10:57:38 +07:00

97 lines
4.2 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
from werkzeug.exceptions import NotFound
from odoo import http
from odoo.http import request
from odoo.addons.mail.models.discuss.mail_guest import add_guest_to_context
from odoo.addons.mail.tools.discuss import Store
from odoo.osv import expression
class WebclientController(http.Controller):
"""Routes for the web client."""
@http.route("/mail/action", methods=["POST"], type="json", auth="public")
@add_guest_to_context
def mail_action(self, **kwargs):
"""Execute actions and returns data depending on request parameters.
This is similar to /mail/data except this method can have side effects.
"""
return self._process_request(**kwargs)
@http.route("/mail/data", methods=["POST"], type="json", auth="public", readonly=True)
@add_guest_to_context
def mail_data(self, **kwargs):
"""Returns data depending on request parameters.
This is similar to /mail/action except this method should be read-only.
"""
return self._process_request(**kwargs)
def _process_request(self, **kwargs):
store = Store()
request.update_context(**kwargs.get("context", {}))
self._process_request_for_all(store, **kwargs)
if not request.env.user._is_public():
self._process_request_for_logged_in_user(store, **kwargs)
if request.env.user._is_internal():
self._process_request_for_internal_user(store, **kwargs)
return store.get_result()
def _process_request_for_all(self, store, **kwargs):
if "init_messaging" in kwargs:
if not request.env.user._is_public():
user = request.env.user.sudo(False)
user._init_messaging(store)
else:
guest = request.env["mail.guest"]._get_guest_from_context()
if not guest:
raise NotFound()
member_domain = [
("is_self", "=", True),
"|",
("fold_state", "in", ("open", "folded")),
("rtc_inviting_session_id", "!=", False),
]
channels_domain = [("channel_member_ids", "any", member_domain)]
channel_types = kwargs["init_messaging"].get("channel_types")
if channel_types:
channels_domain = expression.AND(
[channels_domain, [("channel_type", "in", channel_types)]]
)
store.add(request.env["discuss.channel"].search(channels_domain))
def _process_request_for_logged_in_user(self, store, **kwargs):
if kwargs.get("failures"):
domain = [
("author_id", "=", request.env.user.partner_id.id),
("notification_status", "in", ("bounce", "exception")),
("mail_message_id.message_type", "!=", "user_notification"),
("mail_message_id.model", "!=", False),
("mail_message_id.res_id", "!=", 0),
]
# sudo as to not check ACL, which is far too costly
# sudo: mail.notification - return only failures of current user as author
notifications = request.env["mail.notification"].sudo().search(domain, limit=100)
notifications.mail_message_id._message_notifications_to_store(store)
def _process_request_for_internal_user(self, store, **kwargs):
if kwargs.get("systray_get_activities"):
# sudo: bus.bus: reading non-sensitive last id
bus_last_id = request.env["bus.bus"].sudo()._bus_last_id()
groups = request.env["res.users"]._get_activity_groups()
store.add(
{
"activityCounter": sum(group.get("total_count", 0) for group in groups),
"activity_counter_bus_id": bus_last_id,
"activityGroups": groups,
}
)
if kwargs.get("canned_responses"):
domain = [
"|",
("create_uid", "=", request.env.user.id),
("group_ids", "in", request.env.user.groups_id.ids),
]
store.add(request.env["mail.canned.response"].search(domain))