Odoo18-Base/addons/bus/tests/test_websocket_controller.py
2025-03-10 11:12:23 +07:00

93 lines
3.4 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import json
from odoo.addons.base.tests.common import HttpCaseWithUserDemo
class TestWebsocketController(HttpCaseWithUserDemo):
def _make_rpc(self, route, params, headers=None):
data = json.dumps({
'id': 0,
'jsonrpc': '2.0',
'method': 'call',
'params': params,
}).encode()
headers = headers or {}
headers['Content-Type'] = 'application/json'
return self.url_open(route, data, headers=headers)
def test_websocket_peek(self):
response = json.loads(
self._make_rpc('/websocket/peek_notifications', {
'channels': [],
'last': 0,
'is_first_poll': True,
}).content.decode()
)
# Response containing channels/notifications is retrieved and is
# conform to excpectations.
result = response.get('result')
self.assertIsNotNone(result)
channels = result.get('channels')
self.assertIsNotNone(channels)
self.assertIsInstance(channels, list)
notifications = result.get('notifications')
self.assertIsNotNone(notifications)
self.assertIsInstance(notifications, list)
response = json.loads(
self._make_rpc('/websocket/peek_notifications', {
'channels': [],
'last': 0,
'is_first_poll': False,
}).content.decode()
)
# Reponse is received as long as the session is valid.
self.assertIn('result', response)
def test_websocket_peek_session_expired_login(self):
session = self.authenticate(None, None)
# first rpc should be fine
self._make_rpc('/websocket/peek_notifications', {
'channels': [],
'last': 0,
'is_first_poll': True,
})
self.authenticate('admin', 'admin')
# rpc with outdated session should lead to error.
headers = {'Cookie': f'session_id={session.sid};'}
response = json.loads(
self._make_rpc('/websocket/peek_notifications', {
'channels': [],
'last': 0,
'is_first_poll': False,
}, headers=headers).content.decode()
)
error = response.get('error')
self.assertIsNotNone(error, 'Sending a poll with an outdated session should lead to error')
self.assertEqual('odoo.http.SessionExpiredException', error['data']['name'])
def test_websocket_peek_session_expired_logout(self):
session = self.authenticate('demo', 'demo')
# first rpc should be fine
self._make_rpc('/websocket/peek_notifications', {
'channels': [],
'last': 0,
'is_first_poll': True,
})
self.url_open('/web/session/logout')
# rpc with outdated session should lead to error.
headers = {'Cookie': f'session_id={session.sid};'}
response = json.loads(
self._make_rpc('/websocket/peek_notifications', {
'channels': [],
'last': 0,
'is_first_poll': False,
}, headers=headers).content.decode()
)
error = response.get('error')
self.assertIsNotNone(error, 'Sending a poll with an outdated session should lead to error')
self.assertEqual('odoo.http.SessionExpiredException', error['data']['name'])