#!/usr/bin/env python """ Delete records from an Odoo database based on a model and domain filter. Usage: delete_records.py Example: delete_records.py mydb res.partner --domain "[('active', '=', False)]" --force """ import argparse import ast import json import multiprocessing as mp import os import sys from typing import Dict, List, Tuple from functools import partial import odoorpc import color_log # Default configuration DEFAULT_HOST = "localhost" DEFAULT_PORT = 8069 DEFAULT_USERNAME = "admin" DEFAULT_PASSWORD = "admin" DEFAULT_DOMAIN = "[]" DEFAULT_PROCESS_SIZE = min(mp.cpu_count() * 2, 32) # Dynamic default based on CPU CACHE_DIR = "cache" CHUNK_SIZE = 500 # Records per batch for search operations # Logging levels OK, FAIL, INFO, WARNING = 0, 1, 2, 3 def parse_arguments() -> argparse.Namespace: """Parse and validate command-line arguments.""" parser = argparse.ArgumentParser( description="Safely delete records from an Odoo model with referential integrity checks.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("db_name", help="Database name") parser.add_argument("base_model", help="Model to delete records from") parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host") parser.add_argument( "--port", type=int, default=DEFAULT_PORT, help="Odoo server port" ) parser.add_argument("--username", default=DEFAULT_USERNAME, help="Odoo username") parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Odoo password") parser.add_argument( "--domain", default=DEFAULT_DOMAIN, help="Domain filter as Python list" ) parser.add_argument( "--process-size", type=int, default=DEFAULT_PROCESS_SIZE, help="Number of parallel processes", ) parser.add_argument( "--chunk-size", type=int, default=CHUNK_SIZE, help="Records per batch for search operations", ) action_group = parser.add_mutually_exclusive_group() action_group.add_argument( "--force", action="store_true", help="Force delete with referential integrity bypass", ) parser.add_argument( "--refresh-cache", action="store_true", help="Refresh related models cache" ) parser.add_argument( "--dry-run", action="store_true", help="Simulate operations without making changes", ) parser.add_argument("--verbose", action="store_true", help="Show detailed output") args = parser.parse_args() # Validate domain syntax early try: ast.literal_eval(args.domain) except (ValueError, SyntaxError) as e: color_log.Show(FAIL, f"Invalid domain syntax: {e}") sys.exit(1) return args def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO: """Establish and verify Odoo connection.""" try: odoo = odoorpc.ODOO(args.host, port=args.port) if args.verbose: color_log.Show(INFO, f"Available databases: {odoo.db.list()}") odoo.login(args.db_name, args.username, args.password) color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}") return odoo except odoorpc.error.RPCError as e: color_log.Show(FAIL, f"Login failed: {e}") sys.exit(1) except Exception as e: color_log.Show(FAIL, f"Connection error: {e}") sys.exit(1) def get_related_fields( odoo: odoorpc.ODOO, args: argparse.Namespace ) -> Dict[str, List[str]]: """Retrieve related fields with cache management.""" cache_path = os.path.join(CACHE_DIR, args.db_name, f"{args.base_model}.cache.json") os.makedirs(os.path.dirname(cache_path), exist_ok=True) if not args.refresh_cache and os.path.exists(cache_path): with open(cache_path, "r") as f: color_log.Show(INFO, f"Loaded related models from cache: {args.base_model}") return json.load(f) color_log.Show(INFO, f"Building related models cache for {args.base_model}...") related = {} Model = odoo.env["ir.model"] model_ids = Model.search([("model", "!=", args.base_model)]) for model in Model.read(model_ids, ["model"]): try: fields = odoo.env[model["model"]].fields_get() related_fields = [ name for name, desc in fields.items() if desc.get("relation") == args.base_model and desc.get("type") in ["many2one", "many2many", "one2many"] ] if related_fields: related[model["model"]] = related_fields except Exception as e: if args.verbose: color_log.Show(WARNING, f"Skipping {model['model']}: {str(e)}") with open(cache_path, "w") as f: json.dump(related, f, indent=2) return related def chunker(seq: List[int], size: int) -> List[List[int]]: """Efficient batch generator.""" return [seq[pos : pos + size] for pos in range(0, len(seq), size)] def process_batch( args: argparse.Namespace, batch: List[int], related: Dict[str, List[str]] ) -> Tuple[int, int, int]: """Process a batch of records with proper error handling.""" deleted = archived = skipped = 0 odoo = connect_to_odoo(args) model = odoo.env[args.base_model] for record_id in batch: try: if args.dry_run: color_log.Show(INFO, f"[DRY-RUN] Would process record {record_id}") continue # Check references if not args.force: referenced = any( odoo.env[rel_model].search_count([(field, "=", record_id)]) for rel_model, fields in related.items() for field in fields ) if referenced: model.write([record_id], {"active": False}) archived += 1 color_log.Show(OK, f"Archived {args.base_model} ID {record_id}") continue else: model.unlink([record_id]) deleted += 1 color_log.Show(OK, f"Deleted {args.base_model} ID {record_id}") except odoorpc.error.RPCError as e: color_log.Show(WARNING, f"Error processing {record_id}: {e}") skipped += 1 except Exception as e: color_log.Show(WARNING, f"Unexpected error with {record_id}: {e}") skipped += 1 return deleted, archived, skipped def main(): """Main execution flow.""" args = parse_arguments() odoo = connect_to_odoo(args) # Validate model exists if args.base_model not in odoo.env: color_log.Show(FAIL, f"Model {args.base_model} does not exist") sys.exit(1) # Retrieve records domain = ast.literal_eval(args.domain) record_ids = odoo.env[args.base_model].search( domain, offset=0, limit=None, order="id" ) if not record_ids: color_log.Show( WARNING, f"No records found in {args.base_model} with domain {domain}" ) return color_log.Show(INFO, f"Found {len(record_ids)} records to process") # Prepare related models data related = get_related_fields(odoo, args) if related and args.verbose: color_log.Show(INFO, f"Related models: {json.dumps(related, indent=2)}") # Parallel processing batches = chunker(record_ids, args.chunk_size) color_log.Show( INFO, f"Processing {len(batches)} batches with {args.process_size} workers" ) total_stats = [0, 0, 0] with mp.Pool(args.process_size) as pool: results = pool.imap_unordered( partial(process_batch, args, related=related), batches ) for deleted, archived, skipped in results: total_stats[0] += deleted total_stats[1] += archived total_stats[2] += skipped # Final report color_log.Show(OK, "\nOperation summary:") color_log.Show(OK, f"Total deleted: {total_stats[0]}") color_log.Show(OK, f"Total archived: {total_stats[1]}") color_log.Show(OK, f"Total skipped: {total_stats[2]}") color_log.Show( OK, f"Success rate: {(total_stats[0]+total_stats[1])/len(record_ids)*100:.1f}%" ) if args.dry_run: color_log.Show(WARNING, "Dry-run mode: No changes were made to the database") if __name__ == "__main__": try: main() except KeyboardInterrupt: color_log.Show(FAIL, "\nOperation cancelled by user") sys.exit(1)