Odoo18-Base/odoo/addons/base/models/res_device.py
2025-01-06 10:57:38 +07:00

204 lines
8.0 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
from contextlib import nullcontext
from datetime import datetime
import logging
from odoo import api, fields, models, tools
from odoo.http import GeoIP, request, root
from odoo.tools import SQL, OrderedSet, unique
from odoo.tools.translate import _
from .res_users import check_identity
_logger = logging.getLogger(__name__)
class ResDeviceLog(models.Model):
_name = 'res.device.log'
_description = 'Device Log'
_rec_names_search = ['platform', 'browser']
session_identifier = fields.Char("Session Identifier", required=True, index='btree')
platform = fields.Char("Platform")
browser = fields.Char("Browser")
ip_address = fields.Char("IP Address")
country = fields.Char("Country")
city = fields.Char("City")
device_type = fields.Selection([('computer', 'Computer'), ('mobile', 'Mobile')], "Device Type")
user_id = fields.Many2one("res.users", index='btree')
first_activity = fields.Datetime("First Activity")
last_activity = fields.Datetime("Last Activity", index='btree')
revoked = fields.Boolean("Revoked",
help="""If True, the session file corresponding to this device
no longer exists on the filesystem.""")
is_current = fields.Boolean("Current Device", compute="_compute_is_current")
linked_ip_addresses = fields.Text("Linked IP address", compute="_compute_linked_ip_addresses")
def init(self):
self.env.cr.execute(SQL("""
CREATE INDEX IF NOT EXISTS res_device_log__composite_idx ON %s
(user_id, session_identifier, platform, browser, last_activity, id) WHERE revoked = False
""",
SQL.identifier(self._table)
))
def _compute_display_name(self):
for device in self:
platform = device.platform or _("Unknown")
browser = device.browser or _("Unknown")
device.display_name = f"{platform.capitalize()} {browser.capitalize()}"
def _compute_is_current(self):
for device in self:
device.is_current = request and request.session.sid.startswith(device.session_identifier)
def _compute_linked_ip_addresses(self):
device_group_map = {}
for *device_info, ip_array in self.env['res.device.log']._read_group(
domain=[('session_identifier', 'in', self.mapped('session_identifier'))],
groupby=['session_identifier', 'platform', 'browser'],
aggregates=['ip_address:array_agg']
):
device_group_map[tuple(device_info)] = ip_array
for device in self:
device.linked_ip_addresses = '\n'.join(
OrderedSet(device_group_map.get(
(device.session_identifier, device.platform, device.browser), []
))
)
def _order_field_to_sql(self, alias, field_name, direction, nulls, query):
if field_name == 'is_current' and request:
return SQL("session_identifier = %s DESC", request.session.sid[:42])
return super()._order_field_to_sql(alias, field_name, direction, nulls, query)
def _is_mobile(self, platform):
if not platform:
return False
mobile_platform = ['android', 'iphone', 'ipad', 'ipod', 'blackberry', 'windows phone', 'webos']
return platform.lower() in mobile_platform
@api.model
def _update_device(self, request):
"""
Must be called when we want to update the device for the current request.
Passage through this method must leave a "trace" in the session.
:param request: Request or WebsocketRequest object
"""
trace = request.session.update_trace(request)
if not trace:
return
geoip = GeoIP(trace['ip_address'])
user_id = request.session.uid
session_identifier = request.session.sid[:42]
if self.env.cr.readonly:
self.env.cr.rollback()
cursor = self.env.registry.cursor(readonly=False)
else:
cursor = nullcontext(self.env.cr)
with cursor as cr:
cr.execute(SQL("""
INSERT INTO res_device_log (session_identifier, platform, browser, ip_address, country, city, device_type, user_id, first_activity, last_activity, revoked)
VALUES (%(session_identifier)s, %(platform)s, %(browser)s, %(ip_address)s, %(country)s, %(city)s, %(device_type)s, %(user_id)s, %(first_activity)s, %(last_activity)s, %(revoked)s)
""",
session_identifier=session_identifier,
platform=trace['platform'],
browser=trace['browser'],
ip_address=trace['ip_address'],
country=geoip.get('country_name'),
city=geoip.get('city'),
device_type='mobile' if self._is_mobile(trace['platform']) else 'computer',
user_id=user_id,
first_activity=datetime.fromtimestamp(trace['first_activity']),
last_activity=datetime.fromtimestamp(trace['last_activity']),
revoked=False,
))
_logger.info("User %d inserts device log (%s)", user_id, session_identifier)
@api.autovacuum
def _gc_device_log(self):
# Keep the last device log
# (even if the session file no longer exists on the filesystem)
self.env.cr.execute("""
DELETE FROM res_device_log log1
WHERE EXISTS (
SELECT 1 FROM res_device_log log2
WHERE
log1.session_identifier = log2.session_identifier
AND log1.platform = log2.platform
AND log1.browser = log2.browser
AND log1.ip_address = log2.ip_address
AND log1.last_activity < log2.last_activity
)
""")
_logger.info("GC device logs delete %d entries", self.env.cr.rowcount)
class ResDevice(models.Model):
_name = "res.device"
_inherit = ["res.device.log"]
_description = "Devices"
_auto = False
_order = 'last_activity desc'
@check_identity
def revoke(self):
return self._revoke()
def _revoke(self):
ResDeviceLog = self.env['res.device.log']
session_identifiers = list(unique(device.session_identifier for device in self))
root.session_store.delete_from_identifiers(session_identifiers)
revoked_devices = ResDeviceLog.sudo().search([('session_identifier', 'in', session_identifiers)])
revoked_devices.write({'revoked': True})
_logger.info("User %d revokes devices (%s)", self.env.uid, ', '.join(session_identifiers))
must_logout = bool(self.filtered('is_current'))
if must_logout:
request.session.logout()
@api.model
def _select(self):
return "SELECT D.*"
@api.model
def _from(self):
return "FROM res_device_log D"
@api.model
def _where(self):
return """
WHERE
NOT EXISTS (
SELECT 1
FROM res_device_log D2
WHERE
D2.user_id = D.user_id
AND D2.session_identifier = D.session_identifier
AND D2.platform IS NOT DISTINCT FROM D.platform
AND D2.browser IS NOT DISTINCT FROM D.browser
AND (
D2.last_activity > D.last_activity
OR (D2.last_activity = D.last_activity AND D2.id > D.id)
)
AND D2.revoked = False
)
AND D.revoked = False
"""
@property
def _query(self):
return "%s %s %s" % (self._select(), self._from(), self._where())
def init(self):
tools.drop_view_if_exists(self.env.cr, self._table)
self.env.cr.execute(SQL("""
CREATE or REPLACE VIEW %s as (%s)
""",
SQL.identifier(self._table),
SQL(self._query)
))