Odoo18-Base/odoo/cli/obfuscate.py
2025-03-04 11:07:12 +07:00

256 lines
12 KiB
Python

# Part of Odoo. See LICENSE file for full copyright and licensing details.
import odoo
import sys
import optparse
import logging
from collections import defaultdict
from . import Command
from odoo.modules.registry import Registry
from odoo.tools import SQL
_logger = logging.getLogger(__name__)
class Obfuscate(Command):
"""Obfuscate data in a given odoo database"""
def __init__(self):
super().__init__()
self.cr = None
def _ensure_cr(func):
def check_cr(self, *args, **kwargs):
if not self.cr:
raise Exception("No database connection")
return func(self, *args, **kwargs)
return check_cr
@_ensure_cr
def begin(self):
self.cr.execute("begin work")
self.cr.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto")
@_ensure_cr
def commit(self):
self.cr.commit()
@_ensure_cr
def rollback(self):
self.cr.rollback()
@_ensure_cr
def set_pwd(self, pwd):
"""Set password to cypher/uncypher datas"""
self.cr.execute("INSERT INTO ir_config_parameter (key, value) VALUES ('odoo_cyph_pwd', 'odoo_cyph_'||encode(pgp_sym_encrypt(%s, %s), 'base64')) ON CONFLICT(key) DO NOTHING", [pwd, pwd])
@_ensure_cr
def check_pwd(self, pwd):
"""If password is set, check if it's valid"""
uncypher_pwd = self.uncypher_string(SQL.identifier('value'), pwd)
try:
query = SQL("SELECT %s FROM ir_config_parameter WHERE key='odoo_cyph_pwd'", uncypher_pwd)
self.cr.execute(query)
if self.cr.rowcount == 0 or (self.cr.rowcount == 1 and self.cr.fetchone()[0] == pwd):
return True
except Exception as e: # noqa: BLE001
_logger.error("Error checking password: %s", e)
return False
@_ensure_cr
def clear_pwd(self):
"""Unset password to cypher/uncypher datas"""
self.cr.execute("DELETE FROM ir_config_parameter WHERE key='odoo_cyph_pwd' ")
def cypher_string(self, sql_field: SQL, password):
# don't double cypher fields
return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN %(field_name)s ELSE 'odoo_cyph_'||encode(pgp_sym_encrypt(%(field_name)s, %(pwd)s), 'base64') END""", field_name=sql_field, pwd=password)
def uncypher_string(self, sql_field: SQL, password):
return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN pgp_sym_decrypt(decode(substring(%(field_name)s, 11)::text, 'base64'), %(pwd)s) ELSE %(field_name)s END""", field_name=sql_field, pwd=password)
def check_field(self, table, field):
qry = "SELECT udt_name FROM information_schema.columns WHERE table_name=%s AND column_name=%s"
self.cr.execute(qry, [table, field])
if self.cr.rowcount == 1:
res = self.cr.fetchone()
if res[0] in ['text', 'varchar']:
# Doesn t work for selection fields ...
return 'string'
if res[0] == 'jsonb':
return 'json'
return False
def get_all_fields(self):
qry = "SELECT table_name, column_name FROM information_schema.columns WHERE table_schema='public' AND udt_name IN ['text', 'varchar', 'jsonb'] AND NOT table_name LIKE 'ir_%' ORDER BY 1,2"
self.cr.execute(qry)
return self.cr.fetchall()
def convert_table(self, table, fields, pwd, with_commit=False, unobfuscate=False):
cypherings = []
cyph_fct = self.uncypher_string if unobfuscate else self.cypher_string
for field in fields:
field_type = self.check_field(table, field)
sql_field = SQL.identifier(field)
if field_type == 'string':
cypher_query = cyph_fct(sql_field, pwd)
cypherings.append(SQL('%s=%s', SQL.identifier(field), cypher_query))
elif field_type == 'json':
# List every key
# Loop on keys
# Nest the jsonb_set calls to update all values at once
# Do not create the key in json if doesn't esist
new_field_value = sql_field
self.cr.execute(SQL('select distinct jsonb_object_keys(%s) as key from %s', sql_field, SQL.identifier(table)))
keys = [k[0] for k in self.cr.fetchall()]
for key in keys:
cypher_query = cyph_fct(SQL("%s->>%s", sql_field, key), pwd)
new_field_value = SQL(
"""jsonb_set(%s, array[%s], to_jsonb(%s)::jsonb, FALSE)""",
new_field_value, key, cypher_query
)
cypherings.append(SQL('%s=%s', sql_field, new_field_value))
if cypherings:
query = SQL("UPDATE %s SET %s", SQL.identifier(table), SQL(',').join(cypherings))
self.cr.execute(query)
if with_commit:
self.commit()
self.begin()
def confirm_not_secure(self):
_logger.info("The obfuscate method is not considered as safe to transfer anonymous datas to a third party.")
conf_y = input(f"This will alter data in the database {self.dbname} and can lead to a data loss. Would you like to proceed [y/N]? ")
if conf_y.upper() != 'Y':
self.rollback()
sys.exit(0)
conf_db = input(f"Please type your database name ({self.dbname}) in UPPERCASE to confirm you understand this operation is not considered secure : ")
if self.dbname.upper() != conf_db:
self.rollback()
sys.exit(0)
return True
def run(self, cmdargs):
parser = odoo.tools.config.parser
group = optparse.OptionGroup(parser, "Obfuscate Configuration")
group.add_option('--pwd', dest="pwd", default=False, help="Cypher password")
group.add_option('--fields', dest="fields", default=False, help="List of table.columns to obfuscate/unobfuscate: table1.column1,table2.column1,table2.column2")
group.add_option('--exclude', dest="exclude", default=False, help="List of table.columns to exclude from obfuscate/unobfuscate: table1.column1,table2.column1,table2.column2")
group.add_option('--file', dest="file", default=False, help="File containing the list of table.columns to obfuscate/unobfuscate")
group.add_option('--unobfuscate', action='store_true', default=False)
group.add_option('--allfields', action='store_true', default=False, help="Used in unobfuscate mode, try to unobfuscate all fields. Cannot be used in obfuscate mode. Slower than specifying fields.")
group.add_option('--vacuum', action='store_true', default=False, help="Vacuum database after unobfuscating")
group.add_option('--pertablecommit', action='store_true', default=False, help="Commit after each table instead of a big transaction")
group.add_option(
'-y', '--yes', dest="yes", action='store_true', default=False,
help="Don't ask for manual confirmation. Use it carefully as the obfuscate method is not considered as safe to transfer anonymous datas to a third party.")
parser.add_option_group(group)
if not cmdargs:
sys.exit(parser.print_help())
try:
opt = odoo.tools.config.parse_config(cmdargs, setup_logging=True)
if not opt.pwd:
_logger.error("--pwd is required")
sys.exit("ERROR: --pwd is required")
if opt.allfields and not opt.unobfuscate:
_logger.error("--allfields can only be used in unobfuscate mode")
sys.exit("ERROR: --allfields can only be used in unobfuscate mode")
self.dbname = odoo.tools.config['db_name']
self.registry = Registry(self.dbname)
with self.registry.cursor() as cr:
self.cr = cr
self.begin()
if self.check_pwd(opt.pwd):
fields = [
('mail_tracking_value', 'old_value_char'),
('mail_tracking_value', 'old_value_text'),
('mail_tracking_value', 'new_value_char'),
('mail_tracking_value', 'new_value_text'),
('res_partner', 'name'),
('res_partner', 'complete_name'),
('res_partner', 'email'),
('res_partner', 'phone'),
('res_partner', 'mobile'),
('res_partner', 'street'),
('res_partner', 'street2'),
('res_partner', 'city'),
('res_partner', 'zip'),
('res_partner', 'vat'),
('res_partner', 'website'),
('res_country', 'name'),
('mail_message', 'subject'),
('mail_message', 'email_from'),
('mail_message', 'reply_to'),
('mail_message', 'body'),
('crm_lead', 'name'),
('crm_lead', 'contact_name'),
('crm_lead', 'partner_name'),
('crm_lead', 'email_from'),
('crm_lead', 'phone'),
('crm_lead', 'mobile'),
('crm_lead', 'website'),
('crm_lead', 'description'),
]
if opt.fields:
if not opt.allfields:
fields += [tuple(f.split('.')) for f in opt.fields.split(',')]
else:
_logger.error("--allfields option is set, ignoring --fields option")
if opt.file:
with open(opt.file, encoding='utf-8') as f:
fields += [tuple(l.strip().split('.')) for l in f]
if opt.exclude:
if not opt.allfields:
fields = [f for f in fields if f not in [tuple(f.split('.')) for f in opt.exclude.split(',')]]
else:
_logger.error("--allfields option is set, ignoring --exclude option")
if opt.allfields:
fields = self.get_all_fields()
else:
invalid_fields = [f for f in fields if not self.check_field(f[0], f[1])]
if invalid_fields:
_logger.error("Invalid fields: %s", ', '.join([f"{f[0]}.{f[1]}" for f in invalid_fields]))
fields = [f for f in fields if f not in invalid_fields]
if not opt.unobfuscate and not opt.yes:
self.confirm_not_secure()
_logger.info("Processing fields: %s", ', '.join([f"{f[0]}.{f[1]}" for f in fields]))
tables = defaultdict(set)
for t, f in fields:
if t[0:3] != 'ir_' and '.' not in t:
tables[t].add(f)
if opt.unobfuscate:
_logger.info("Unobfuscating datas")
for table in tables:
_logger.info("Unobfuscating table %s", table)
self.convert_table(table, tables[table], opt.pwd, opt.pertablecommit, True)
if opt.vacuum:
_logger.info("Vacuuming obfuscated tables")
for table in tables:
_logger.debug("Vacuuming table %s", table)
self.cr.execute(SQL("VACUUM FULL %s", SQL.identifier(table)))
self.clear_pwd()
else:
_logger.info("Obfuscating datas")
self.set_pwd(opt.pwd)
for table in tables:
_logger.info("Obfuscating table %s", table)
self.convert_table(table, tables[table], opt.pwd, opt.pertablecommit)
self.commit()
else:
self.rollback()
except Exception as e: # noqa: BLE001
sys.exit("ERROR: %s" % e)