256 lines
12 KiB
Python
256 lines
12 KiB
Python
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
|
import odoo
|
|
import sys
|
|
import optparse
|
|
import logging
|
|
|
|
from collections import defaultdict
|
|
|
|
from . import Command
|
|
from odoo.modules.registry import Registry
|
|
from odoo.tools import SQL
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Obfuscate(Command):
|
|
"""Obfuscate data in a given odoo database"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.cr = None
|
|
|
|
def _ensure_cr(func):
|
|
def check_cr(self, *args, **kwargs):
|
|
if not self.cr:
|
|
raise Exception("No database connection")
|
|
return func(self, *args, **kwargs)
|
|
return check_cr
|
|
|
|
@_ensure_cr
|
|
def begin(self):
|
|
self.cr.execute("begin work")
|
|
self.cr.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto")
|
|
|
|
@_ensure_cr
|
|
def commit(self):
|
|
self.cr.commit()
|
|
|
|
@_ensure_cr
|
|
def rollback(self):
|
|
self.cr.rollback()
|
|
|
|
@_ensure_cr
|
|
def set_pwd(self, pwd):
|
|
"""Set password to cypher/uncypher datas"""
|
|
self.cr.execute("INSERT INTO ir_config_parameter (key, value) VALUES ('odoo_cyph_pwd', 'odoo_cyph_'||encode(pgp_sym_encrypt(%s, %s), 'base64')) ON CONFLICT(key) DO NOTHING", [pwd, pwd])
|
|
|
|
@_ensure_cr
|
|
def check_pwd(self, pwd):
|
|
"""If password is set, check if it's valid"""
|
|
uncypher_pwd = self.uncypher_string(SQL.identifier('value'), pwd)
|
|
|
|
try:
|
|
query = SQL("SELECT %s FROM ir_config_parameter WHERE key='odoo_cyph_pwd'", uncypher_pwd)
|
|
self.cr.execute(query)
|
|
if self.cr.rowcount == 0 or (self.cr.rowcount == 1 and self.cr.fetchone()[0] == pwd):
|
|
return True
|
|
except Exception as e: # noqa: BLE001
|
|
_logger.error("Error checking password: %s", e)
|
|
return False
|
|
|
|
@_ensure_cr
|
|
def clear_pwd(self):
|
|
"""Unset password to cypher/uncypher datas"""
|
|
self.cr.execute("DELETE FROM ir_config_parameter WHERE key='odoo_cyph_pwd' ")
|
|
|
|
def cypher_string(self, sql_field: SQL, password):
|
|
# don't double cypher fields
|
|
return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN %(field_name)s ELSE 'odoo_cyph_'||encode(pgp_sym_encrypt(%(field_name)s, %(pwd)s), 'base64') END""", field_name=sql_field, pwd=password)
|
|
|
|
def uncypher_string(self, sql_field: SQL, password):
|
|
return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN pgp_sym_decrypt(decode(substring(%(field_name)s, 11)::text, 'base64'), %(pwd)s) ELSE %(field_name)s END""", field_name=sql_field, pwd=password)
|
|
|
|
def check_field(self, table, field):
|
|
qry = "SELECT udt_name FROM information_schema.columns WHERE table_name=%s AND column_name=%s"
|
|
self.cr.execute(qry, [table, field])
|
|
if self.cr.rowcount == 1:
|
|
res = self.cr.fetchone()
|
|
if res[0] in ['text', 'varchar']:
|
|
# Doesn t work for selection fields ...
|
|
return 'string'
|
|
if res[0] == 'jsonb':
|
|
return 'json'
|
|
return False
|
|
|
|
def get_all_fields(self):
|
|
qry = "SELECT table_name, column_name FROM information_schema.columns WHERE table_schema='public' AND udt_name IN ['text', 'varchar', 'jsonb'] AND NOT table_name LIKE 'ir_%' ORDER BY 1,2"
|
|
self.cr.execute(qry)
|
|
return self.cr.fetchall()
|
|
|
|
def convert_table(self, table, fields, pwd, with_commit=False, unobfuscate=False):
|
|
cypherings = []
|
|
cyph_fct = self.uncypher_string if unobfuscate else self.cypher_string
|
|
|
|
for field in fields:
|
|
field_type = self.check_field(table, field)
|
|
sql_field = SQL.identifier(field)
|
|
if field_type == 'string':
|
|
cypher_query = cyph_fct(sql_field, pwd)
|
|
cypherings.append(SQL('%s=%s', SQL.identifier(field), cypher_query))
|
|
elif field_type == 'json':
|
|
# List every key
|
|
# Loop on keys
|
|
# Nest the jsonb_set calls to update all values at once
|
|
# Do not create the key in json if doesn't esist
|
|
new_field_value = sql_field
|
|
self.cr.execute(SQL('select distinct jsonb_object_keys(%s) as key from %s', sql_field, SQL.identifier(table)))
|
|
keys = [k[0] for k in self.cr.fetchall()]
|
|
for key in keys:
|
|
cypher_query = cyph_fct(SQL("%s->>%s", sql_field, key), pwd)
|
|
new_field_value = SQL(
|
|
"""jsonb_set(%s, array[%s], to_jsonb(%s)::jsonb, FALSE)""",
|
|
new_field_value, key, cypher_query
|
|
)
|
|
cypherings.append(SQL('%s=%s', sql_field, new_field_value))
|
|
|
|
if cypherings:
|
|
query = SQL("UPDATE %s SET %s", SQL.identifier(table), SQL(',').join(cypherings))
|
|
self.cr.execute(query)
|
|
if with_commit:
|
|
self.commit()
|
|
self.begin()
|
|
|
|
def confirm_not_secure(self):
|
|
_logger.info("The obfuscate method is not considered as safe to transfer anonymous datas to a third party.")
|
|
conf_y = input(f"This will alter data in the database {self.dbname} and can lead to a data loss. Would you like to proceed [y/N]? ")
|
|
if conf_y.upper() != 'Y':
|
|
self.rollback()
|
|
sys.exit(0)
|
|
conf_db = input(f"Please type your database name ({self.dbname}) in UPPERCASE to confirm you understand this operation is not considered secure : ")
|
|
if self.dbname.upper() != conf_db:
|
|
self.rollback()
|
|
sys.exit(0)
|
|
return True
|
|
|
|
def run(self, cmdargs):
|
|
parser = odoo.tools.config.parser
|
|
group = optparse.OptionGroup(parser, "Obfuscate Configuration")
|
|
group.add_option('--pwd', dest="pwd", default=False, help="Cypher password")
|
|
group.add_option('--fields', dest="fields", default=False, help="List of table.columns to obfuscate/unobfuscate: table1.column1,table2.column1,table2.column2")
|
|
group.add_option('--exclude', dest="exclude", default=False, help="List of table.columns to exclude from obfuscate/unobfuscate: table1.column1,table2.column1,table2.column2")
|
|
group.add_option('--file', dest="file", default=False, help="File containing the list of table.columns to obfuscate/unobfuscate")
|
|
group.add_option('--unobfuscate', action='store_true', default=False)
|
|
group.add_option('--allfields', action='store_true', default=False, help="Used in unobfuscate mode, try to unobfuscate all fields. Cannot be used in obfuscate mode. Slower than specifying fields.")
|
|
group.add_option('--vacuum', action='store_true', default=False, help="Vacuum database after unobfuscating")
|
|
group.add_option('--pertablecommit', action='store_true', default=False, help="Commit after each table instead of a big transaction")
|
|
group.add_option(
|
|
'-y', '--yes', dest="yes", action='store_true', default=False,
|
|
help="Don't ask for manual confirmation. Use it carefully as the obfuscate method is not considered as safe to transfer anonymous datas to a third party.")
|
|
|
|
parser.add_option_group(group)
|
|
if not cmdargs:
|
|
sys.exit(parser.print_help())
|
|
|
|
try:
|
|
opt = odoo.tools.config.parse_config(cmdargs, setup_logging=True)
|
|
if not opt.pwd:
|
|
_logger.error("--pwd is required")
|
|
sys.exit("ERROR: --pwd is required")
|
|
if opt.allfields and not opt.unobfuscate:
|
|
_logger.error("--allfields can only be used in unobfuscate mode")
|
|
sys.exit("ERROR: --allfields can only be used in unobfuscate mode")
|
|
self.dbname = odoo.tools.config['db_name']
|
|
self.registry = Registry(self.dbname)
|
|
with self.registry.cursor() as cr:
|
|
self.cr = cr
|
|
self.begin()
|
|
if self.check_pwd(opt.pwd):
|
|
fields = [
|
|
('mail_tracking_value', 'old_value_char'),
|
|
('mail_tracking_value', 'old_value_text'),
|
|
('mail_tracking_value', 'new_value_char'),
|
|
('mail_tracking_value', 'new_value_text'),
|
|
('res_partner', 'name'),
|
|
('res_partner', 'complete_name'),
|
|
('res_partner', 'email'),
|
|
('res_partner', 'phone'),
|
|
('res_partner', 'mobile'),
|
|
('res_partner', 'street'),
|
|
('res_partner', 'street2'),
|
|
('res_partner', 'city'),
|
|
('res_partner', 'zip'),
|
|
('res_partner', 'vat'),
|
|
('res_partner', 'website'),
|
|
('res_country', 'name'),
|
|
('mail_message', 'subject'),
|
|
('mail_message', 'email_from'),
|
|
('mail_message', 'reply_to'),
|
|
('mail_message', 'body'),
|
|
('crm_lead', 'name'),
|
|
('crm_lead', 'contact_name'),
|
|
('crm_lead', 'partner_name'),
|
|
('crm_lead', 'email_from'),
|
|
('crm_lead', 'phone'),
|
|
('crm_lead', 'mobile'),
|
|
('crm_lead', 'website'),
|
|
('crm_lead', 'description'),
|
|
]
|
|
|
|
if opt.fields:
|
|
if not opt.allfields:
|
|
fields += [tuple(f.split('.')) for f in opt.fields.split(',')]
|
|
else:
|
|
_logger.error("--allfields option is set, ignoring --fields option")
|
|
if opt.file:
|
|
with open(opt.file, encoding='utf-8') as f:
|
|
fields += [tuple(l.strip().split('.')) for l in f]
|
|
if opt.exclude:
|
|
if not opt.allfields:
|
|
fields = [f for f in fields if f not in [tuple(f.split('.')) for f in opt.exclude.split(',')]]
|
|
else:
|
|
_logger.error("--allfields option is set, ignoring --exclude option")
|
|
|
|
if opt.allfields:
|
|
fields = self.get_all_fields()
|
|
else:
|
|
invalid_fields = [f for f in fields if not self.check_field(f[0], f[1])]
|
|
if invalid_fields:
|
|
_logger.error("Invalid fields: %s", ', '.join([f"{f[0]}.{f[1]}" for f in invalid_fields]))
|
|
fields = [f for f in fields if f not in invalid_fields]
|
|
|
|
if not opt.unobfuscate and not opt.yes:
|
|
self.confirm_not_secure()
|
|
|
|
_logger.info("Processing fields: %s", ', '.join([f"{f[0]}.{f[1]}" for f in fields]))
|
|
tables = defaultdict(set)
|
|
|
|
for t, f in fields:
|
|
if t[0:3] != 'ir_' and '.' not in t:
|
|
tables[t].add(f)
|
|
|
|
if opt.unobfuscate:
|
|
_logger.info("Unobfuscating datas")
|
|
for table in tables:
|
|
_logger.info("Unobfuscating table %s", table)
|
|
self.convert_table(table, tables[table], opt.pwd, opt.pertablecommit, True)
|
|
|
|
if opt.vacuum:
|
|
_logger.info("Vacuuming obfuscated tables")
|
|
for table in tables:
|
|
_logger.debug("Vacuuming table %s", table)
|
|
self.cr.execute(SQL("VACUUM FULL %s", SQL.identifier(table)))
|
|
self.clear_pwd()
|
|
else:
|
|
_logger.info("Obfuscating datas")
|
|
self.set_pwd(opt.pwd)
|
|
for table in tables:
|
|
_logger.info("Obfuscating table %s", table)
|
|
self.convert_table(table, tables[table], opt.pwd, opt.pertablecommit)
|
|
|
|
self.commit()
|
|
else:
|
|
self.rollback()
|
|
|
|
except Exception as e: # noqa: BLE001
|
|
sys.exit("ERROR: %s" % e)
|