Odoo18-Base/setup/record_cleaner.py
KaySar12 12f395fad4
All checks were successful
Setup Native Action / native (3.12.7) (push) Has been skipped
Setup Native Action / docker (3.12.7) (push) Has been skipped
update
2025-03-24 12:14:24 +07:00

260 lines
8.4 KiB
Python
Executable File

#!/usr/bin/env python
"""
Delete records from an Odoo database based on a model and domain filter.
Usage:
delete_records.py <db_name> <base_model>
Example:
delete_records.py mydb res.partner --domain "[('active', '=', False)]" --force
"""
import argparse
import ast
import json
import multiprocessing as mp
import os
import sys
from typing import Dict, List, Tuple
from functools import partial
import odoorpc
import color_log
# Default configuration
DEFAULT_HOST = "10.1.1.34"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
DEFAULT_DOMAIN = "[]"
DEFAULT_PROCESS_SIZE = min(mp.cpu_count() * 2, 32) # Dynamic default based on CPU
CACHE_DIR = "cache"
CHUNK_SIZE = 500 # Records per batch for search operations
# Logging levels
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(
description="Safely delete records from an Odoo model with referential integrity checks.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("db_name", help="Database name")
parser.add_argument("base_model", help="Model to delete records from")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument("--username", default=DEFAULT_USERNAME, help="Odoo username")
parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Odoo password")
parser.add_argument(
"--domain", default=DEFAULT_DOMAIN, help="Domain filter as Python list"
)
parser.add_argument(
"--process-size",
type=int,
default=DEFAULT_PROCESS_SIZE,
help="Number of parallel processes",
)
parser.add_argument(
"--chunk-size",
type=int,
default=CHUNK_SIZE,
help="Records per batch for search operations",
)
action_group = parser.add_mutually_exclusive_group()
action_group.add_argument(
"--force",
action="store_true",
help="Force delete with referential integrity bypass",
)
parser.add_argument(
"--refresh-cache", action="store_true", help="Refresh related models cache"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Simulate operations without making changes",
)
parser.add_argument("--verbose", action="store_true", help="Show detailed output")
args = parser.parse_args()
# Validate domain syntax early
try:
ast.literal_eval(args.domain)
except (ValueError, SyntaxError) as e:
color_log.Show(FAIL, f"Invalid domain syntax: {e}")
sys.exit(1)
return args
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
if args.verbose:
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
odoo.login(args.db_name, args.username, args.password)
color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def get_related_fields(
odoo: odoorpc.ODOO, args: argparse.Namespace
) -> Dict[str, List[str]]:
"""Retrieve related fields with cache management."""
cache_path = os.path.join(CACHE_DIR, args.db_name, f"{args.base_model}.cache.json")
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
if not args.refresh_cache and os.path.exists(cache_path):
with open(cache_path, "r") as f:
color_log.Show(INFO, f"Loaded related models from cache: {args.base_model}")
return json.load(f)
color_log.Show(INFO, f"Building related models cache for {args.base_model}...")
related = {}
Model = odoo.env["ir.model"]
model_ids = Model.search([("model", "!=", args.base_model)])
for model in Model.read(model_ids, ["model"]):
try:
fields = odoo.env[model["model"]].fields_get()
related_fields = [
name
for name, desc in fields.items()
if desc.get("relation") == args.base_model
and desc.get("type") in ["many2one", "many2many", "one2many"]
]
if related_fields:
related[model["model"]] = related_fields
except Exception as e:
if args.verbose:
color_log.Show(WARNING, f"Skipping {model['model']}: {str(e)}")
with open(cache_path, "w") as f:
json.dump(related, f, indent=2)
return related
def chunker(seq: List[int], size: int) -> List[List[int]]:
"""Efficient batch generator."""
return [seq[pos : pos + size] for pos in range(0, len(seq), size)]
def process_batch(
args: argparse.Namespace, batch: List[int], related: Dict[str, List[str]]
) -> Tuple[int, int, int]:
"""Process a batch of records with proper error handling."""
deleted = archived = skipped = 0
odoo = connect_to_odoo(args)
model = odoo.env[args.base_model]
for record_id in batch:
try:
if args.dry_run:
color_log.Show(INFO, f"[DRY-RUN] Would process record {record_id}")
continue
# Check references
if not args.force:
# referenced = any(
# odoo.env[rel_model].search_count([(field, "=", record_id)])
# for rel_model, fields in related.items()
# for field in fields
# )
# if referenced:
model.write([record_id], {"active": False})
archived += 1
color_log.Show(OK, f"Archived {args.base_model} ID {record_id}")
continue
else:
model.unlink([record_id])
deleted += 1
color_log.Show(OK, f"Deleted {args.base_model} ID {record_id}")
except odoorpc.error.RPCError as e:
color_log.Show(WARNING, f"Error processing {record_id}: {e}")
skipped += 1
except Exception as e:
color_log.Show(WARNING, f"Unexpected error with {record_id}: {e}")
skipped += 1
return deleted, archived, skipped
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
# Validate model exists
if args.base_model not in odoo.env:
color_log.Show(FAIL, f"Model {args.base_model} does not exist")
sys.exit(1)
# Retrieve records
domain = ast.literal_eval(args.domain)
record_ids = odoo.env[args.base_model].search(
domain, offset=0, limit=None, order="id"
)
if not record_ids:
color_log.Show(
WARNING, f"No records found in {args.base_model} with domain {domain}"
)
return
color_log.Show(INFO, f"Found {len(record_ids)} records to process")
# Prepare related models data
related = get_related_fields(odoo, args)
if related and args.verbose:
color_log.Show(INFO, f"Related models: {json.dumps(related, indent=2)}")
# Parallel processing
batches = chunker(record_ids, args.chunk_size)
color_log.Show(
INFO, f"Processing {len(batches)} batches with {args.process_size} workers"
)
total_stats = [0, 0, 0]
with mp.Pool(args.process_size) as pool:
results = pool.imap_unordered(
partial(process_batch, args, related=related), batches
)
for deleted, archived, skipped in results:
total_stats[0] += deleted
total_stats[1] += archived
total_stats[2] += skipped
# Final report
color_log.Show(OK, "\nOperation summary:")
color_log.Show(OK, f"Total deleted: {total_stats[0]}")
color_log.Show(OK, f"Total archived: {total_stats[1]}")
color_log.Show(OK, f"Total skipped: {total_stats[2]}")
color_log.Show(
OK, f"Success rate: {(total_stats[0]+total_stats[1])/len(record_ids)*100:.1f}%"
)
if args.dry_run:
color_log.Show(WARNING, "Dry-run mode: No changes were made to the database")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)