Compare commits

..

25 Commits
main ... 18.0

Author SHA1 Message Date
f12b605180 update utility 2025-04-08 09:49:25 +07:00
4938a55ad7 update 2025-04-05 12:25:58 +07:00
ad32607fd2 update makefile 2025-04-05 08:56:59 +07:00
5ea37ce876 update update_modules script 2025-04-04 12:07:59 +07:00
34b51127c7 update 2025-04-04 11:47:45 +07:00
9034257c59 update 2025-04-04 11:47:16 +07:00
0c903e3732 update 2025-04-04 11:14:27 +07:00
f4061c164a update 2025-04-04 11:11:57 +07:00
cfb17c020d update 2025-04-04 11:11:39 +07:00
b77089bb79 update 2025-04-04 11:05:31 +07:00
fdf854d6d4 update 2025-04-04 11:02:18 +07:00
fc8e53bd67 update 2025-04-04 10:23:35 +07:00
6caea7e002 update utility 2025-04-04 10:22:52 +07:00
57286a15e3 update 2025-04-04 09:10:37 +07:00
ffc7cef059 update 2025-04-04 08:47:01 +07:00
dbd5a8325c update 2025-04-03 18:20:53 +07:00
b249bc7149 update 2025-04-03 17:05:42 +07:00
8ac2ca2ee8 update 2025-04-03 16:45:10 +07:00
9001a02c59 update 2025-04-03 16:31:42 +07:00
b29630e1b6 update 2025-04-03 16:20:17 +07:00
422b142d90 update project structure 2025-04-03 15:58:08 +07:00
43760c6565 update scripts 2025-04-03 15:33:38 +07:00
d7eeb8b1fe update 2025-04-03 11:57:16 +07:00
2005a8b511 update 2025-04-03 10:19:58 +07:00
55253a5afc update 2025-04-03 10:06:04 +07:00
40 changed files with 958 additions and 1546 deletions

6
.gitignore vendored
View File

@ -6,7 +6,6 @@ __pycache__/
# C extensions
*.so
# Distribution / packaging
.Python
build/
@ -15,7 +14,7 @@ dist/
downloads/
eggs/
.eggs/
lib/
# lib/
lib64/
parts/
sdist/
@ -138,7 +137,7 @@ venv.bak/
# mkdocs documentation
/site
config/*.yaml
# mypy
.mypy_cache/
.dmypy.json
@ -175,3 +174,4 @@ cython_debug/
# .nfs files are created when an open file is removed but is still being accessed
.nfs*

0
__init__.py Normal file
View File

View File

@ -1,103 +0,0 @@
#!/usr/bin/env python
import shutil
import odoorpc
import color_log
import argparse
import sys
import base64
import os
from datetime import datetime
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
BACKUP_DIR = "odoo_backups"
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
# odoo.login(args.db_name, args.username, args.password)
# color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(description="Backup all Odoo databases.")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument(
"--admin-password", required=True, help="Odoo master admin password"
)
parser.add_argument(
"--database",
nargs="*",
help="Specific databases to backup (leave empty to backup all databases)",
)
return parser.parse_args()
def backup_database(odoo: odoorpc.ODOO, db_name: str, admin_password: str):
"""Backup a single Odoo database."""
date_str = datetime.now().strftime("%m-%d-%Y")
try:
print(f"Backing up database: {db_name}...")
timeout_backup = odoo.config["timeout"]
odoo.config["timeout"] = 600 # Timeout set to 10 minutes
backup_data = odoo.db.dump(admin_password, db_name)
odoo.config["timeout"] = timeout_backup
os.makedirs(BACKUP_DIR, exist_ok=True)
backup_path = os.path.join(BACKUP_DIR, f"{db_name}-{date_str}.zip")
# Ensure BytesIO object is written correctly
with open(backup_path, "wb") as f:
f.write(backup_data.read())
print(f"Backup saved: {backup_path}")
except Exception as e:
print(f"Failed to backup {db_name}: {e}")
def backup_all_databases(odoo: odoorpc.ODOO, admin_password: str):
"""Backup all available databases."""
try:
db_list = odoo.db.list()
print("Databases found:", db_list)
for db_name in db_list:
backup_database(odoo, db_name, admin_password)
except Exception as e:
print(f"Error retrieving database list: {e}")
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
if args.database:
for db_name in args.database:
backup_database(odoo, db_name, args.admin_password)
else:
backup_all_databases(odoo, args.admin_password)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)

View File

@ -1,27 +0,0 @@
#!/bin/bash
# Get the current branch name
branch_name=$(git rev-parse --abbrev-ref HEAD)
# Get a list of all virtual environments, filtering out duplicates and those not containing the branch name
virtualenvs=$(pyenv virtualenvs | awk '{print $1}' | sort -u | grep "$branch_name")
# Count the number of virtual environments
count=$(echo "$virtualenvs" | wc -l)
# Calculate how many virtual environments to keep
keep_count=$((count - $1))
# If there are more than 3 virtual environments, delete the oldest ones
if (( keep_count > 0 )); then
# Get the oldest virtual environments (assuming they are listed first)
oldest_venvs=$(echo "$virtualenvs" | head -n "$keep_count")
# Loop through the oldest virtual environments and delete them
for venv in $oldest_venvs; do
echo "Deleting virtual environment: $venv"
pyenv virtualenv-delete -f "$venv"
done
fi
echo "Old virtual environments containing '$branch_name' deleted."

0
cli/__init__.py Normal file
View File

0
cli/database.py Normal file
View File

71
cli/git.py Normal file
View File

@ -0,0 +1,71 @@
# cli/module.py
import argparse
from services.git.handler import GitHandler
import lib.color_log as color_log
def setup_cli(subparsers):
git_parser = subparsers.add_parser('git', help='Git operations for Odoo instances')
git_subparsers = git_parser.add_subparsers(dest='git_command', help='Git commands')
# Clone command
clone_parser = git_subparsers.add_parser('clone', help='Clone repository for an instance')
clone_parser.add_argument('instance_name', help='Name of the instance to clone repository for')
clone_parser.add_argument('--branch', help='Branch to clone (optional)')
# Pull command
pull_parser = git_subparsers.add_parser('pull', help='Pull updates for an instance')
pull_parser.add_argument('instance_name', help='Name of the instance to pull updates for')
pull_parser.add_argument('--branch', help='Branch to pull from (optional)')
pull_parser.add_argument('--force', action='store_true', help='Force pull with hard reset (discards local changes)')
# Get commit command
commit_parser = git_subparsers.add_parser('commit', help='Get current commit hash')
commit_parser.add_argument('instance_name', help='Name of the instance to get commit for')
git_parser.set_defaults(func=git)
def git(args):
git_handler = GitHandler(config_path="utility/config/settings.yaml")
if args.git_command == 'clone':
try:
success = git_handler.clone_or_open_repo(
instance_name=args.instance_name,
branch=args.branch
)
if success:
color_log.Show("OK", f"Successfully cloned repository for {args.instance_name}")
else:
color_log.Show("FAILED", f"Failed to clone repository for {args.instance_name}")
except Exception as e:
color_log.Show("FAILED", f"Error cloning repository: {str(e)}")
elif args.git_command == 'pull':
try:
success = git_handler.pull_updates(
instance_name=args.instance_name,
branch=args.branch,
force=args.force
)
if success:
if args.force:
color_log.Show("OK", f"Successfully force pulled updates for {args.instance_name}")
else:
color_log.Show("OK", f"Successfully pulled updates for {args.instance_name}")
else:
color_log.Show("FAILED", f"Failed to pull updates for {args.instance_name}")
except Exception as e:
color_log.Show("FAILED", f"Error pulling updates: {str(e)}")
elif args.git_command == 'commit':
try:
commit_hash = git_handler.get_current_commit()
if commit_hash:
color_log.Show("INFO", f"Current commit for {args.instance_name}: {commit_hash}")
else:
color_log.Show("WARNING", f"No commit found for {args.instance_name}")
except Exception as e:
color_log.Show("FAILED", f"Error getting commit: {str(e)}")
else:
color_log.Show("ERROR", "Please specify a valid git command (clone/pull/commit)")

55
cli/module.py Normal file
View File

@ -0,0 +1,55 @@
# cli/module.py
import tqdm
from services.odoo.module import OdooModuleManager
import lib.color_log as color_log
def setup_cli(subparsers):
module_parser = subparsers.add_parser("module", help="Manage instance module")
module_parser.add_argument(
"action", choices=["install", "uninstall", "upgrade"], help="Module action"
)
module_parser.add_argument("instance", type=str, help="Instance Name")
module_parser.add_argument(
"--modules", "-m", nargs="+", help="List of modules to process"
)
module_parser.set_defaults(func=module)
return module_parser
def module(args):
module_manager = OdooModuleManager(config_path="utility/config/settings.yaml")
if args.modules:
color_log.Show(
"INFO",
f"Processing modules: {', '.join(args.modules)} for {args.instance}",
)
else:
color_log.Show(
"INFO",
f"No modules specified. Using default modules for {args.instance}",
)
args.modules = module_manager.get_modules(args.instance)
for module_name in tqdm.tqdm(
args.modules, desc="Processing modules", unit="module"
):
match args.action:
case "install":
module_manager.install(args.instance, [module_name])
case "uninstall":
module_manager.uninstall(args.instance, [module_name])
case "upgrade":
# Check if module is installed first
if not module_manager.is_module_installed(args.instance, module_name):
color_log.Show(
"INFO",
f"Module {module_name} not installed. Installing first...",
)
module_manager.install(args.instance, [module_name])
# Now upgrade the module
module_manager.upgrade(args.instance, [module_name])
case _:
color_log.Show(
"FAILED",
f"Invalid action '{args.action}' for module management.",
)

29
cli/service.py Normal file
View File

@ -0,0 +1,29 @@
# cli/service.py
from services.odoo.service import OdooServiceManager
import lib.color_log as color_log
def setup_cli(subparsers):
service_parser = subparsers.add_parser("service", help="Manage instance service")
service_parser.add_argument(
"action", choices=["start", "stop", "restart"], help="Service action"
)
service_parser.add_argument("instance", type=str, help="Instance Name")
service_parser.set_defaults(func=service)
return service_parser
def service(args):
service = OdooServiceManager(config_path="utility/config/settings.yaml")
match args.action:
case "start":
service.start_service(args.instance)
case "stop":
service.stop_service(args.instance)
case "restart":
service.restart_service(args.instance)
case _:
color_log.Show(
"FAILED",
f"Invalid action '{args.action}' for service management.",
)

32
config/settings.template Normal file
View File

@ -0,0 +1,32 @@
common: &common
username: "<username>"
password: "<password>"
type: "<service_type>"
service_name: "<service_name>"
git: &git_config
repo_url: "<git_repo_url>"
branch: "<branch_name>"
local_path: "<local_git_path>"
ssh: &ssh_config
user: <ssh_user>
key_path: "<ssh_key_path>"
odoo_instances:
- name: "<instance_name_1>"
host: "<instance_host_1>"
port: <instance_port_1>
database: "<instance_database_1>"
modules:
- "<module_1>"
- "<module_2>"
<<: *common # Inherit common settings
- name: "<instance_name_2>"
host: "<instance_host_2>"
port: <instance_port_2>
database: "<instance_database_2>"
modules:
- "<module_4>"
- "<module_5>"
- "<module_6>"
<<: *common # Inherit common settings

View File

@ -1,82 +0,0 @@
#!/usr/bin/bash
export PATH=/usr/sbin:$PATH
export DEBIAN_FRONTEND=noninteractive
set -euo pipefail
readonly COLOUR_RESET='\e[0m'
readonly aCOLOUR=(
'\e[38;5;154m' # green | Lines, bullets and separators
'\e[1m' # Bold white | Main descriptions
'\e[90m' # Grey | Credits
'\e[91m' # Red | Update notifications Alert
'\e[33m' # Yellow | Emphasis
)
trap 'onCtrlC' INT
onCtrlC() {
echo -e "${COLOUR_RESET}"
exit 1
}
Show() {
# OK
if (($1 == 0)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[0]} OK $COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
# FAILED
elif (($1 == 1)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[3]}FAILED$COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
exit 1
# INFO
elif (($1 == 2)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[0]} INFO $COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
# NOTICE
elif (($1 == 3)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[4]}NOTICE$COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
fi
}
Warn() {
echo -e "${aCOLOUR[3]}$1$COLOUR_RESET"
}
GreyStart() {
echo -e "${aCOLOUR[2]}\c"
}
ColorReset() {
echo -e "$COLOUR_RESET\c"
}
main() {
DEPLOYMENT_DIR=$(pwd)/deployment
BACKUP_DIR="$DEPLOYMENT_DIR/backup"
DOWNLOAD_URL="$1"
BACKUP_FILE="$BACKUP_DIR/$2"
# Check if the deployment and backup directories exist, create them if not
if [[ ! -d "$BACKUP_DIR" ]]; then
echo "Backup directory does not exist. Creating: $BACKUP_DIR"
mkdir -p "$BACKUP_DIR"
fi
# Check if the download URL is valid
echo "Checking if the URL is valid: $DOWNLOAD_URL"
if curl --head --silent --fail "$DOWNLOAD_URL" > /dev/null; then
echo "URL is valid. Proceeding with download..."
else
Show 1 "Error: Invalid or inaccessible URL: $DOWNLOAD_URL"
exit 1
fi
# Download the file and rename it to backup.zip
wget -O "$BACKUP_FILE" "$DOWNLOAD_URL"
# Check if the file was downloaded
if [[ -f "$BACKUP_FILE" ]]; then
Show 0 "Backup file successfully downloaded to: $BACKUP_FILE"
else
Show 1 "Error: Backup file was not downloaded."
exit 1
fi
}
main "$@"

View File

@ -1,67 +0,0 @@
#!/usr/bin/env python3
import argparse
import configparser
import shutil
import os
from dotenv import set_key
from pathlib import Path
import socket
import secrets
import string
import color_log
def find_available_port(start_port=80):
"""Finds an available port starting from the given port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
while True:
try:
sock.bind(('0.0.0.0', start_port))
color_log.Show(3,f" {start_port} is Open")
return start_port
except OSError as e:
if e.errno == 98: # Address already in use
print(f"{start_port} already in use , Try other port ...")
start_port += 1
else:
raise
def main():
"""
Generates a random password and finds an available port.
Updates the Odoo configuration file and .env file with these values.
"""
parser = argparse.ArgumentParser(description="Generate Odoo configuration")
parser.add_argument('--db_user', type=str, help='')
parser.add_argument('--db_pass', type=str, help='')
parser.add_argument('--deploy_path', type=str, help='')
parser.add_argument('--addons_path', type=str, help='')
# parser.add_argument('--db_filter', type=str, help='')
parser.add_argument('--db_port', type=int, help='')
parser.add_argument('--db_server', type=str, help='')
args = parser.parse_args()
db_port = args.db_port
db_user = args.db_user
db_pass = args.db_pass
db_server = args.db_server
app_port = find_available_port(8069)
addons_path = args.addons_path
base_dir= args.deploy_path
# db_filter= args.db_filter
# Copy template files
os.makedirs(f"{base_dir}/etc", exist_ok=True)
color_log.Show(3,f"Copy {base_dir}/odoo.conf.template to {base_dir}/etc/odoo.conf")
shutil.copyfile(f'{base_dir}/odoo.conf.template', f'{base_dir}/odoo.conf')
# Update Odoo configuration file
config = configparser.ConfigParser()
config.read(f'{base_dir}/odoo.conf')
config['options']['db_host'] = str(db_server)
config['options']['db_user'] = db_user
config['options']['db_password'] = db_pass
config['options']['db_port'] = str(db_port)
config['options']['addons_path'] = addons_path
config['options']['xmlrpc_port'] = str(app_port)
config['options']['dbfilter'] = ".*"
config['options']['proxy_mode'] = "True"
with open(f'{base_dir}/odoo.conf', 'w') as configfile:
config.write(configfile)
if __name__ == "__main__":
main()

View File

@ -9,16 +9,16 @@ def colorize(text, code):
def Show(status, message):
"""Displays a message with a status indicator."""
colors = {
0: (
'OK': (
colorize("[", "90") + colorize(" OK ", "38;5;154") + colorize("]", "90")
), # Green, Grey
1: (
'FAILED': (
colorize("[", "90") + colorize(" FAILED ", "91") + colorize("]", "90")
), # Red, Grey
2: (
'INFO': (
colorize("[", "90") + colorize(" INFO ", "38;5;154") + colorize("]", "90")
), # Green, Grey
3: (
'WARNING': (
colorize("[", "90") + colorize(" WARNING ", "33") + colorize("]", "90")
), # Yellow, Grey
}

31
main.py Normal file
View File

@ -0,0 +1,31 @@
# main.py
import argparse
from cli.service import setup_cli as setup_service_cli
from cli.module import setup_cli as setup_module_cli
from cli.git import setup_cli as setup_git_cli
def setup_cli():
parser = argparse.ArgumentParser(description="Service Manager CLI")
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose mode"
)
subparsers = parser.add_subparsers(dest="command", required=True)
setup_service_cli(subparsers)
setup_module_cli(subparsers)
setup_git_cli(subparsers)
return parser
def main():
parser = setup_cli()
args = parser.parse_args()
if hasattr(args, "func"):
args.func(args)
else:
print("Invalid command. Use --help for more details.")
if __name__ == "__main__":
main()

View File

@ -1,44 +0,0 @@
import subprocess
import yaml
import os
import argparse
# Set up argument parsing
parser = argparse.ArgumentParser(
description="Checkout modules from target branch that are not in source branch."
)
parser.add_argument("yaml_file", help="Path to the YAML file")
parser.add_argument("source_branch", help="The source branch")
parser.add_argument("target_branch", help="The target branch")
parser.add_argument("root_repo", help="Path to the root repository")
# Parse the arguments
args = parser.parse_args()
yaml_file = args.yaml_file
source_branch = args.source_branch
target_branch = args.target_branch
root_repo = args.root_repo
# Change to the repository directory
os.chdir(root_repo)
# Read YAML file
with open(yaml_file, "r") as file:
data = yaml.safe_load(file)
# Extract module lists for source and target branches
modules_source = data["branches"].get(source_branch, {}).get("modules", [])
modules_target = data["branches"].get(target_branch, {}).get("modules", [])
# Ensure the latest changes are fetched
subprocess.run(["git", "fetch", "origin"], check=True)
# Checkout source branch first
print(f"Checking out source branch: {source_branch}")
subprocess.run(["git", "checkout", source_branch], check=True)
# Checkout modules in target_branch that are not in source_branch
for module in modules_target:
if module not in modules_source:
print(f"Checking out module: {module}")
subprocess.run(["git", "checkout", target_branch, "--", module], check=True)

View File

@ -1,259 +0,0 @@
#!/usr/bin/env python
"""
Delete records from an Odoo database based on a model and domain filter.
Usage:
delete_records.py <db_name> <base_model>
Example:
delete_records.py mydb res.partner --domain "[('active', '=', False)]" --force
"""
import argparse
import ast
import json
import multiprocessing as mp
import os
import sys
from typing import Dict, List, Tuple
from functools import partial
import odoorpc
import color_log
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
DEFAULT_DOMAIN = "[]"
DEFAULT_PROCESS_SIZE = min(mp.cpu_count() * 2, 32) # Dynamic default based on CPU
CACHE_DIR = "cache"
CHUNK_SIZE = 500 # Records per batch for search operations
# Logging levels
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(
description="Safely delete records from an Odoo model with referential integrity checks.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("db_name", help="Database name")
parser.add_argument("base_model", help="Model to delete records from")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument("--username", default=DEFAULT_USERNAME, help="Odoo username")
parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Odoo password")
parser.add_argument(
"--domain", default=DEFAULT_DOMAIN, help="Domain filter as Python list"
)
parser.add_argument(
"--process-size",
type=int,
default=DEFAULT_PROCESS_SIZE,
help="Number of parallel processes",
)
parser.add_argument(
"--chunk-size",
type=int,
default=CHUNK_SIZE,
help="Records per batch for search operations",
)
action_group = parser.add_mutually_exclusive_group()
action_group.add_argument(
"--force",
action="store_true",
help="Force delete with referential integrity bypass",
)
parser.add_argument(
"--refresh-cache", action="store_true", help="Refresh related models cache"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Simulate operations without making changes",
)
parser.add_argument("--verbose", action="store_true", help="Show detailed output")
args = parser.parse_args()
# Validate domain syntax early
try:
ast.literal_eval(args.domain)
except (ValueError, SyntaxError) as e:
color_log.Show(FAIL, f"Invalid domain syntax: {e}")
sys.exit(1)
return args
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
if args.verbose:
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
odoo.login(args.db_name, args.username, args.password)
color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def get_related_fields(
odoo: odoorpc.ODOO, args: argparse.Namespace
) -> Dict[str, List[str]]:
"""Retrieve related fields with cache management."""
cache_path = os.path.join(CACHE_DIR, args.db_name, f"{args.base_model}.cache.json")
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
if not args.refresh_cache and os.path.exists(cache_path):
with open(cache_path, "r") as f:
color_log.Show(INFO, f"Loaded related models from cache: {args.base_model}")
return json.load(f)
color_log.Show(INFO, f"Building related models cache for {args.base_model}...")
related = {}
Model = odoo.env["ir.model"]
model_ids = Model.search([("model", "!=", args.base_model)])
for model in Model.read(model_ids, ["model"]):
try:
fields = odoo.env[model["model"]].fields_get()
related_fields = [
name
for name, desc in fields.items()
if desc.get("relation") == args.base_model
and desc.get("type") in ["many2one", "many2many", "one2many"]
]
if related_fields:
related[model["model"]] = related_fields
except Exception as e:
if args.verbose:
color_log.Show(WARNING, f"Skipping {model['model']}: {str(e)}")
with open(cache_path, "w") as f:
json.dump(related, f, indent=2)
return related
def chunker(seq: List[int], size: int) -> List[List[int]]:
"""Efficient batch generator."""
return [seq[pos : pos + size] for pos in range(0, len(seq), size)]
def process_batch(
args: argparse.Namespace, batch: List[int], related: Dict[str, List[str]]
) -> Tuple[int, int, int]:
"""Process a batch of records with proper error handling."""
deleted = archived = skipped = 0
odoo = connect_to_odoo(args)
model = odoo.env[args.base_model]
for record_id in batch:
try:
if args.dry_run:
color_log.Show(INFO, f"[DRY-RUN] Would process record {record_id}")
continue
# Check references
if not args.force:
# referenced = any(
# odoo.env[rel_model].search_count([(field, "=", record_id)])
# for rel_model, fields in related.items()
# for field in fields
# )
# if referenced:
model.write([record_id], {"active": False})
archived += 1
color_log.Show(OK, f"Archived {args.base_model} ID {record_id}")
continue
else:
model.unlink([record_id])
deleted += 1
color_log.Show(OK, f"Deleted {args.base_model} ID {record_id}")
except odoorpc.error.RPCError as e:
color_log.Show(WARNING, f"Error processing {record_id}: {e}")
skipped += 1
except Exception as e:
color_log.Show(WARNING, f"Unexpected error with {record_id}: {e}")
skipped += 1
return deleted, archived, skipped
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
# Validate model exists
if args.base_model not in odoo.env:
color_log.Show(FAIL, f"Model {args.base_model} does not exist")
sys.exit(1)
# Retrieve records
domain = ast.literal_eval(args.domain)
record_ids = odoo.env[args.base_model].search(
domain, offset=0, limit=None, order="id"
)
if not record_ids:
color_log.Show(
WARNING, f"No records found in {args.base_model} with domain {domain}"
)
return
color_log.Show(INFO, f"Found {len(record_ids)} records to process")
# Prepare related models data
related = get_related_fields(odoo, args)
if related and args.verbose:
color_log.Show(INFO, f"Related models: {json.dumps(related, indent=2)}")
# Parallel processing
batches = chunker(record_ids, args.chunk_size)
color_log.Show(
INFO, f"Processing {len(batches)} batches with {args.process_size} workers"
)
total_stats = [0, 0, 0]
with mp.Pool(args.process_size) as pool:
results = pool.imap_unordered(
partial(process_batch, args, related=related), batches
)
for deleted, archived, skipped in results:
total_stats[0] += deleted
total_stats[1] += archived
total_stats[2] += skipped
# Final report
color_log.Show(OK, "\nOperation summary:")
color_log.Show(OK, f"Total deleted: {total_stats[0]}")
color_log.Show(OK, f"Total archived: {total_stats[1]}")
color_log.Show(OK, f"Total skipped: {total_stats[2]}")
color_log.Show(
OK, f"Success rate: {(total_stats[0]+total_stats[1])/len(record_ids)*100:.1f}%"
)
if args.dry_run:
color_log.Show(WARNING, "Dry-run mode: No changes were made to the database")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)

View File

@ -1,244 +0,0 @@
# -*- coding: utf-8 -*-
import re
from bs4 import formatter, BeautifulSoup as bs
from pathlib import Path
xml_4indent_formatter = formatter.XMLFormatter(indent=4)
NEW_ATTRS = {'required', 'invisible', 'readonly', 'column_invisible'}
percent_d_regex = re.compile("%\('?\"?[\w\.\d_]+'?\"?\)d")
def get_files_recursive(path):
return (str(p) for p in Path(path).glob('**/*.xml') if p.is_file())
root_dir = input('Enter root directory to check (empty for current directory) : ')
root_dir = root_dir or '.'
all_xml_files = get_files_recursive(root_dir)
def normalize_domain(domain):
"""Normalize Domain, taken from odoo/osv/expression.py -> just the part so that & operators are added where needed.
After that, we can use a part of the def parse() from the same file to manage parenthesis for and/or"""
if len(domain) == 1:
return domain
result = []
expected = 1 # expected number of expressions
op_arity = {'!': 1, '&': 2, '|': 2}
for token in domain:
if expected == 0: # more than expected, like in [A, B]
result[0:0] = ['&'] # put an extra '&' in front
expected = 1
if isinstance(token, (list, tuple)): # domain term
expected -= 1
token = tuple(token)
else:
expected += op_arity.get(token, 0) - 1
result.append(token)
return result
def stringify_leaf(leaf):
stringify = ''
switcher = False
# Replace operators not supported in python (=, like, ilike)
operator = str(leaf[1])
if operator == '=':
operator = '=='
elif 'like' in operator:
if 'not' in operator:
operator = 'not in'
else:
operator = 'in'
switcher = True
# Take left operand, never to add quotes (should be python object / field)
left_operand = leaf[0]
# Take care of right operand, don't add quotes if it's list/tuple/set/boolean/number, check if we have a true/false/1/0 string tho.
right_operand = leaf[2]
if right_operand in ('True', 'False', '1', '0') or type(right_operand) in (list, tuple, set, int, float, bool):
right_operand = str(right_operand)
else:
right_operand = "'"+right_operand+"'"
stringify = "%s %s %s" % (right_operand if switcher else left_operand, operator, left_operand if switcher else right_operand)
return stringify
def stringify_attr(stack):
if stack in (True, False, 'True', 'False', 1, 0, '1', '0'):
return stack
last_parenthesis_index = max(index for index, item in enumerate(stack[::-1]) if item not in ('|', '!'))
stack = normalize_domain(stack)
stack = stack[::-1]
result = []
for index, leaf_or_operator in enumerate(stack):
if leaf_or_operator == '!':
expr = result.pop()
result.append('(not (%s))' % expr)
elif leaf_or_operator == '&' or leaf_or_operator == '|':
left = result.pop()
# In case of a single | or single & , we expect that it's a tag that have an attribute AND a state
# the state will be added as OR in states management
try:
right = result.pop()
except IndexError:
res = left + ('%s' % ' and' if leaf_or_operator=='&' else ' or')
result.append(res)
continue
form = '(%s %s %s)'
if index > last_parenthesis_index:
form = '%s %s %s'
result.append(form % (left, 'and' if leaf_or_operator=='&' else 'or', right))
else:
result.append(stringify_leaf(leaf_or_operator))
result = result[0]
return result
def get_new_attrs(attrs):
new_attrs = {}
attrs_dict = eval(attrs.strip())
for attr in NEW_ATTRS:
if attr in attrs_dict.keys():
new_attrs[attr] = stringify_attr(attrs_dict[attr])
ordered_attrs = {attr: new_attrs[attr] for attr in NEW_ATTRS if attr in new_attrs}
return ordered_attrs
# Prettify puts <attribute> on three lines (1/ opening tag, 2/ text, 3/ closing tag), not very cool.
# Taken from https://stackoverflow.com/questions/55962146/remove-line-breaks-and-spaces-around-span-elements-with-python-regex
# And changed to avoid putting ALL one line, and only manage <attribute>, as it's the only one messing stuff here
# Kinda ugly to use the 3 types of tags but tbh I keep it like this while I have no time for a regex replace keeping the name="x" :p
def prettify_output(html):
for attr in NEW_ATTRS:
html = re.sub(f'<attribute name="{attr}">[ \n]+',f'<attribute name="{attr}">', html)
html = re.sub(f'[ \n]+</attribute>',f'</attribute>', html)
html = re.sub(r'<field name="([a-z_]+)">[ \n]+', r'<field name="\1">', html)
html = re.sub(r'[ \n]+</field>', r'</field>', html)
return html
autoreplace = input('Do you want to auto-replace attributes ? (y/n) (empty == no) (will not ask confirmation for each file) : ') or 'n'
nofilesfound = True
ok_files = []
nok_files = []
for xml_file in all_xml_files:
try:
with open(xml_file, 'rb') as f:
contents = f.read().decode('utf-8')
f.close()
if not 'attrs' in contents and not 'states' in contents:
continue
counter_for_percent_d_replace = 1
percent_d_results = {}
for percent_d in percent_d_regex.findall(contents):
contents = contents.replace(percent_d, "'REPLACEME%s'" % counter_for_percent_d_replace)
percent_d_results[counter_for_percent_d_replace] = percent_d
counter_for_percent_d_replace += 1
soup = bs(contents, 'xml')
tags_with_attrs = soup.select('[attrs]')
attribute_tags_name_attrs = soup.select('attribute[name="attrs"]')
tags_with_states = soup.select('[states]')
attribute_tags_name_states = soup.select('attribute[name="states"]')
if not (tags_with_attrs or attribute_tags_name_attrs or\
tags_with_states or attribute_tags_name_states):
continue
print('\n################################################################')
print('##### Taking care of file -> %s' % xml_file)
print('\n########### Current tags found ###\n')
for t in tags_with_attrs + attribute_tags_name_attrs + tags_with_states + attribute_tags_name_states:
print(t)
nofilesfound = False
# Management of tags that have attrs=""
for tag in tags_with_attrs:
attrs = tag['attrs']
new_attrs = get_new_attrs(attrs)
del tag['attrs']
for new_attr in new_attrs.keys():
tag[new_attr] = new_attrs[new_attr]
# Management of attributes name="attrs"
attribute_tags_after = []
for attribute_tag in attribute_tags_name_attrs:
new_attrs = get_new_attrs(attribute_tag.text)
for new_attr in new_attrs.keys():
new_tag = soup.new_tag('attribute')
new_tag['name'] = new_attr
new_tag.append(str(new_attrs[new_attr]))
attribute_tags_after.append(new_tag)
attribute_tag.insert_after(new_tag)
attribute_tag.decompose()
# Management ot tags that have states=""
for state_tag in tags_with_states:
base_invisible = ''
if 'invisible' in state_tag.attrs and state_tag['invisible']:
base_invisible = state_tag['invisible']
if not (base_invisible.endswith('or') or base_invisible.endswith('and')):
base_invisible = base_invisible + ' or '
else:
base_invisible = base_invisible + ' '
invisible_attr = "state not in [%s]" % ','.join(("'" + state.strip() + "'") for state in state_tag['states'].split(','))
state_tag['invisible'] = base_invisible + invisible_attr
del state_tag['states']
# Management of attributes name="states"
attribute_tags_states_after = []
for attribute_tag_states in attribute_tags_name_states:
states = attribute_tag_states.text
existing_invisible_tag = False
# I don't know why, looking for attribute[name="invisible"] does not work,
# but if it exists, I can find it with findAll attribute -> loop to name="invisible"
for tag in attribute_tag_states.parent.findAll('attribute'):
if tag['name'] == 'invisible':
existing_invisible_tag = tag
break
if not existing_invisible_tag:
existing_invisible_tag = soup.new_tag('attribute')
existing_invisible_tag['name'] = 'invisible'
if existing_invisible_tag.text:
states_to_add = 'state not in [%s]' % (
','.join(("'" + state.strip() + "'") for state in states.split(','))
)
if existing_invisible_tag.text.endswith('or') or existing_invisible_tag.text.endswith('and'):
new_invisible_text = '%s %s' % (existing_invisible_tag.text, states_to_add)
else:
new_invisible_text = ' or '.join([existing_invisible_tag.text, states_to_add])
else:
new_invisible_text = 'state not in [%s]' % (
','.join(("'" + state.strip() + "'") for state in states.split(','))
)
existing_invisible_tag.string = new_invisible_text
attribute_tag_states.insert_after(existing_invisible_tag)
attribute_tag_states.decompose()
attribute_tags_states_after.append(existing_invisible_tag)
print('\n########### Will be replaced by ###\n')
for t in tags_with_attrs + attribute_tags_after + tags_with_states + attribute_tags_states_after:
print(t)
print('################################################################\n')
if autoreplace.lower()[0] == 'n':
confirm = input('Do you want to replace? (y/n) (empty == no) : ') or 'n'
else:
confirm = 'y'
if confirm.lower()[0] == 'y':
with open(xml_file, 'wb') as rf:
html = soup.prettify(formatter=xml_4indent_formatter)
html = prettify_output(html)
for percent_d_result in percent_d_results.keys():
html = html.replace("'REPLACEME%s'" % percent_d_result, percent_d_results[percent_d_result])
rf.write(html.encode('utf-8'))
ok_files.append(xml_file)
except Exception as e:
nok_files.append((xml_file, e))
print('\n################################################')
print('################## Run Debug ##################')
print('################################################')
if nofilesfound:
print('No XML Files with "attrs" or "states" found in dir "%s"' % root_dir)
print('Succeeded on files')
for file in ok_files:
print(file)
if not ok_files:
print('No files')
print('')
print('Failed on files')
for file in nok_files:
print(file[0])
print('Reason: ', file[1])
if not nok_files:
print('No files')

View File

@ -1,516 +0,0 @@
#!/usr/bin/env python
"""
Checks versions from the requirements files against distribution-provided
versions, taking distribution's Python version in account e.g. if checking
against a release which bundles Python 3.5, checks the 3.5 version of
requirements.
* only shows requirements for which at least one release diverges from the
matching requirements version
* empty or green cells mean that specific release matches its requirement (happens when
checking multiple releases: one of the other releases may mismatch the its
requirements necessating showing the row)
This script was heavily reworked but is not in a final version:
TODO:
- add legends
- better management of cache
- add meta info on cells (mainly to genearte a better html report)
- warn/ko reason
- wheel + link
- original debian package name + link
...
"""
import argparse
import gzip
import itertools
import json
import os
import re
import shutil
import tempfile
try:
import ansitoimg
except ImportError:
ansitoimg = None
from abc import ABC, abstractmethod
from pathlib import Path
from sys import stderr, stdout
from typing import Dict, List, Optional, Tuple
from urllib.request import HTTPError
from urllib.request import urlopen as _urlopen
from packaging.markers import Marker
from packaging.requirements import Requirement
from packaging.tags import mac_platforms # noqa: PLC2701
from packaging.utils import canonicalize_name
from pip._internal.index.package_finder import (
LinkEvaluator, # noqa: PLC2701
)
from pip._internal.models.link import Link # noqa: PLC2701
from pip._internal.models.target_python import TargetPython # noqa: PLC2701
Version = Tuple[int, ...]
# shared beween debian and ubuntu
SPECIAL = {
'pytz': 'tz',
'libsass': 'libsass-python',
}
SUPPORTED_FORMATS = ('txt', 'ansi', 'svg', 'html', 'json')
PLATFORM_CODES = ('linux', 'win32', 'darwin')
PLATFORM_NAMES = ('Linux', 'Win', 'OSX')
def urlopen(url):
file_name = "".join(c if c.isalnum() else '_' for c in url)
os.makedirs('/tmp/package_versions_cache/', exist_ok=True)
file_path = f'/tmp/package_versions_cache/{file_name}'
if not os.path.isfile(file_path):
response = _urlopen(url)
with open(file_path, 'wb') as fw:
fw.write(response.read())
return open(file_path, 'rb') # noqa: SIM115
def parse_version(vstring: str) -> Optional[Version]:
if not vstring:
return None
return tuple(map(int, vstring.split('.')))
def cleanup_debian_version(s: str) -> str:
""" Try to strip the garbage from the version string, just remove everything
following the first `+`, `~` or `-`
"""
return re.match(r'''
(?:\d+:)? # debian crud prefix
(.*?) # the shit we actually want
(?:~|\+|-|\.dfsg)
.*
''', s, flags=re.VERBOSE)[1]
class PipPackage:
def __init__(self, name):
self.name = name
infos = json.load(urlopen(f'https://pypi.org/pypi/{name}/json'))
self.info = infos['info']
self.last_serial = infos['last_serial']
self.releases = infos['releases']
self.urls = infos['urls']
self.vulnerabilities = infos['vulnerabilities']
def has_wheel_for(self, version, python_version, platform):
if version is None:
return (False, False, False)
py_version_info = python_version.split('.')
if len(py_version_info) == 2:
py_version_info = (py_version_info[0], py_version_info[1], 0)
releases = self.releases
has_wheel_for_version = False
has_any_wheel = False
has_wheel_in_another_version = False
platforms = None
if platform == 'darwin':
platforms = list(mac_platforms((15, 0), 'x86_64'))
elif platform == 'win32':
platforms = ['win32', 'win-amd64']
else:
assert platform == 'linux'
target_python = TargetPython(
platforms=platforms,
py_version_info=py_version_info,
abis=None,
implementation=None,
)
le = LinkEvaluator(
project_name=self.name,
canonical_name=canonicalize_name(self.name),
formats={"binary", "source"},
target_python=target_python,
allow_yanked=True,
ignore_requires_python=False,
)
for release in releases[version]:
if release['filename'].endswith('.whl'):
has_any_wheel = True
is_candidate, _result = le.evaluate_link(Link(
comes_from=None,
url=release['url'],
requires_python=release['requires_python'],
yanked_reason=release['yanked_reason'],
))
if is_candidate:
if release['filename'].endswith('.whl'):
has_wheel_for_version = has_wheel_in_another_version = True
break
if not has_wheel_for_version and has_any_wheel:
# TODO, we should prefer a version matching the one from a distro
for rel_version, rel in releases.items():
for release in rel:
if not release['filename'].endswith('.whl'):
continue
if any(not s.isdigit() for s in rel_version.split('.')) or parse_version(rel_version) <= parse_version(version):
continue
is_candidate, _result = le.evaluate_link(Link(
comes_from=None,
url=release['url'],
requires_python=release['requires_python'],
yanked_reason=release['yanked_reason'],
))
if is_candidate:
has_wheel_in_another_version = True
stderr.write(f'WARNING: Wheel found for {self.name} ({python_version} {platform}) in {rel_version}\n')
return (has_wheel_for_version, has_any_wheel, has_wheel_in_another_version)
return (has_wheel_for_version, has_any_wheel, has_wheel_in_another_version)
class Distribution(ABC):
def __init__(self, release):
self._release = release
@abstractmethod
def get_version(self, package: str) -> Optional[Version]:
...
def __str__(self):
return f'{type(self).__name__.lower()} {self._release}'
@classmethod
def get(cls, name):
try:
return next(
c
for c in cls.__subclasses__()
if c.__name__.lower() == name
)
except StopIteration:
msg = f"Unknown distribution {name!r}"
raise ValueError(msg)
class Debian(Distribution):
def get_version(self, package):
""" Try to find which version of ``package`` is in Debian release {release}
"""
package = SPECIAL.get(package, package)
# try the python prefix first: some packages have a native of foreign $X and
# either the bindings or a python equivalent at python-X, or just a name
# collision
prefixes = ['python-', '']
if package.startswith('python'):
prefixes = ['']
for prefix in prefixes:
try:
res = json.load(urlopen(f'https://sources.debian.org/api/src/{prefix}{package}/'))
except HTTPError:
return 'failed'
if res.get('error') is None:
break
if res.get('error'):
return
try:
return next(
parse_version(cleanup_debian_version(distr['version']))
for distr in res['versions']
if distr['area'] == 'main'
if self._release.lower() in distr['suites']
)
except StopIteration:
return
class Ubuntu(Distribution):
""" Ubuntu doesn't have an API, instead it has a huge text file
"""
def __init__(self, release):
super().__init__(release)
self._packages = {}
# ideally we should request the proper Content-Encoding but PUC
# apparently does not care, and returns a somewhat funky
# content-encoding (x-gzip) anyway
data = gzip.open(
urlopen(f'https://packages.ubuntu.com/source/{release}/allpackages?format=txt.gz'),
mode='rt', encoding='utf-8',
)
for line in itertools.islice(data, 6, None): # first 6 lines is garbage header
# ignore the restricted, security, universe, multiverse tags
m = re.match(r'(\S+) \(([^)]+)\)', line.strip())
assert m, f"invalid line {line.strip()!r}"
self._packages[m[1]] = m[2]
def get_version(self, package):
package = SPECIAL.get(package, package)
for prefix in ['python3-', 'python-', '']:
v = self._packages.get(f'{prefix}{package}')
if v:
return parse_version(cleanup_debian_version(v))
return None
def _strip_comment(line):
return line.split('#', 1)[0].strip()
def parse_requirements(reqpath: Path) -> Dict[str, List[Tuple[str, Marker]]]:
""" Parses a requirement file to a dict of {package: [(version, markers)]}
The env markers express *whether* that specific dep applies.
"""
reqs = {}
with reqpath.open('r', encoding='utf-8') as f:
for req_line in f:
req_line = _strip_comment(req_line)
if not req_line:
continue
requirement = Requirement(req_line)
version = None
if requirement.specifier:
if len(requirement.specifier) > 1:
raise NotImplementedError('multi spec not supported yet')
version = next(iter(requirement.specifier)).version
reqs.setdefault(requirement.name, []).append((version, requirement.marker))
return reqs
def ok(text):
return f'\033[92m{text}\033[39m'
def em(text):
return f'\033[94m{text}\033[39m'
def warn(text):
return f'\033[93m{text}\033[39m'
def ko(text):
return f'\033[91m{text}\033[39m'
def default(text):
return text
def main(args):
checkers = [
Distribution.get(distro)(release)
for version in args.release
for (distro, release) in [version.split(':')]
]
stderr.write("Fetch Python versions...\n")
pyvers = [
'.'.join(map(str, checker.get_version('python3-defaults')[:2]))
for checker in checkers
]
uniq = sorted(set(pyvers), key=parse_version)
platforms = PLATFORM_NAMES if args.check_pypi else PLATFORM_NAMES[:1]
platform_codes = PLATFORM_CODES if args.check_pypi else PLATFORM_CODES[:1]
platform_headers = ['']
python_headers = ['']
table = [platform_headers, python_headers]
# requirements headers
for v in uniq:
for p in platforms:
platform_headers.append(p)
python_headers.append(v)
# distro headers
for checker, version in zip(checkers, pyvers):
platform_headers.append(checker._release[:5])
python_headers.append(version)
reqs = parse_requirements((Path.cwd() / __file__).parent.parent / 'requirements.txt')
if args.filter:
reqs = {r: o for r, o in reqs.items() if any(f in r for f in args.filter.split(','))}
for req, options in reqs.items():
if args.check_pypi:
pip_infos = PipPackage(req)
row = [req]
seps = [' || ']
byver = {}
for pyver in uniq:
# FIXME: when multiple options apply, check which pip uses
# (first-matching. best-matching, latest, ...)
seps[-1] = ' || '
for platform in platform_codes:
platform_version = 'none'
for version, markers in options:
if not markers or markers.evaluate({
'python_version': pyver,
'sys_platform': platform,
}):
if platform == 'linux':
byver[pyver] = version
platform_version = version
break
deco = None
if args.check_pypi:
if platform_version == 'none':
deco = 'ok'
else:
has_wheel_for_version, has_any_wheel, has_wheel_in_another_version = pip_infos.has_wheel_for(platform_version, pyver, platform)
if has_wheel_for_version:
deco = 'ok'
elif has_wheel_in_another_version:
deco = 'ko'
elif has_any_wheel:
deco = 'warn'
if deco in ("ok", None):
if byver.get(pyver, 'none') != platform_version:
deco = 'em'
req_ver = platform_version or 'any'
row.append((req_ver, deco))
seps.append(' | ')
seps[-1] = ' |#| '
# this requirement doesn't apply, ignore
if not byver and not args.all:
continue
for i, c in enumerate(checkers):
req_version = byver.get(pyvers[i], 'none') or 'any'
check_version = '.'.join(map(str, c.get_version(req.lower()) or [])) or None
if req_version != check_version:
deco = 'ko'
if req_version == 'none':
deco = 'ok'
elif req_version == 'any':
if check_version is None:
deco = 'ok'
elif check_version is None:
deco = 'ko'
elif parse_version(req_version) >= parse_version(check_version):
deco = 'warn'
row.append((check_version or '</>', deco))
elif args.all:
row.append((check_version or '</>', 'ok'))
else:
row.append('')
seps.append(' |#| ')
table.append(row)
seps[-1] = ' ' # remove last column separator
stderr.write('\n')
# evaluate width of columns
sizes = [0] * len(table[0])
for row in table:
sizes = [
max(s, len(cell[0] if isinstance(cell, tuple) else cell))
for s, cell in zip(sizes, row)
]
output_format = 'ansi'
if args.format:
output_format = args.format
assert format in SUPPORTED_FORMATS
elif args.output:
output_format = 'txt'
ext = args.output.split('.')[-1]
if ext in SUPPORTED_FORMATS:
output_format = ext
if output_format == 'json':
output = json.dumps(table)
else:
output = ''
# format table
for row in table:
output += ' '
for cell, width, sep in zip(row, sizes, seps):
cell_content = cell
deco = default
if isinstance(cell, tuple):
cell_content, level = cell
if output_format == 'txt' or level is None:
deco = default
elif level == 'ok':
deco = ok
elif level == 'em':
deco = em
elif level == 'warn':
deco = warn
else:
deco = ko
output += deco(f'{cell_content:<{width}}') + sep
output += '\n'
if output_format in ('svg', 'html'):
if not ansitoimg:
output_format = 'ansi'
stderr.write(f'Missing ansitoimg for {output_format} format, switching to ansi')
else:
convert = ansitoimg.ansiToSVG
if output_format == 'html':
convert = ansitoimg.ansiToHTML
with tempfile.NamedTemporaryFile() as tmp:
convert(output, tmp.name, width=(sum(sizes) + sum(len(sep) for sep in seps)), title='requirements-check.py')
output = tmp.read().decode()
# remove mac like bullets
output = output.replace('''<g transform="translate(26,22)">
<circle cx="0" cy="0" r="7" fill="#ff5f57"/>
<circle cx="22" cy="0" r="7" fill="#febc2e"/>
<circle cx="44" cy="0" r="7" fill="#28c840"/>
</g>''', "") #
if args.output:
with open(args.output, 'w', encoding='utf8') as f:
f.write(output)
else:
stdout.write(output)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'release', nargs='+',
help="Release to check against, should use the format '{distro}:{release}' e.g. 'debian:sid'"
)
parser.add_argument(
'-a', '--all', action="store_true",
help="Display all requirements even if it matches",
)
parser.add_argument(
'-o', '--output', help="output path",
)
parser.add_argument(
'-f', '--format', help=f"Supported format: {', '.join(SUPPORTED_FORMATS)}",
)
parser.add_argument(
'--update-cache', action="store_true",
help="Ignore the existing package version cache and update them",
)
parser.add_argument(
'--check-pypi', action="store_true",
help="Check wheel packages",
)
parser.add_argument(
'--filter',
help="Comma sepaated list of package to check",
)
args = parser.parse_args()
if args.update_cache:
shutil.rmtree('/tmp/package_versions_cache/')
main(args)

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
odoorpc==0.9.0
GitPython==3.1.43
PyYAML==6.0.1
tqdm

View File

@ -1,102 +0,0 @@
#!/usr/bin/env python
import shutil
import odoorpc
import color_log
import argparse
import sys
import base64
import os
from datetime import datetime
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
RESTORE_DIR = "odoo_backups"
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
# odoo.login(args.db_name, args.username, args.password)
# color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(description="restore all Odoo databases.")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument(
"--admin-password", required=True, help="Odoo master admin password"
)
parser.add_argument(
"--database",
nargs="*",
help="Specific databases to restore (leave empty to restore all databases)",
)
return parser.parse_args()
def restore_database(odoo: odoorpc.ODOO, db_name: str, admin_password: str):
"""Restore a single Odoo database."""
try:
backup_path = os.path.join(RESTORE_DIR, f"{db_name}.zip")
if not os.path.exists(backup_path):
print(f"Backup file for {db_name} not found: {backup_path}")
return
with open(backup_path, "rb") as f:
print(f"Restoring database: {db_name} from {backup_path}...")
timeout_backup = odoo.config['timeout']
odoo.config['timeout'] = 7200 # Timeout set to 2 hours
odoo.db.restore(admin_password, db_name, f)
odoo.config['timeout'] = timeout_backup
print(f"Database {db_name} restored successfully.")
except Exception as e:
print(f"Failed to restore {db_name}: {e}")
def restore_all_databases(odoo: odoorpc.ODOO, admin_password: str):
"""Restore all databases from backup files in the restore directory."""
try:
backup_files = [f for f in os.listdir(RESTORE_DIR) if f.endswith(".zip")]
print("Backup files found:", backup_files)
for backup_file in backup_files:
db_name = os.path.splitext(backup_file)[0]
restore_database(odoo, db_name, admin_password)
except Exception as e:
print(f"Error restoring databases: {e}")
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
if args.database:
for db_name in args.database:
restore_database(odoo, db_name, args.admin_password)
else:
restore_all_databases(odoo, args.admin_password)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)

0
scripts/__init__.py Normal file
View File

0
clean_up_addons.sh → scripts/clean_up_addons.sh Executable file → Normal file
View File

0
dir2file.sh → scripts/dir2file.sh Executable file → Normal file
View File

0
gen_config_docker.py → scripts/gen_config_docker.py Executable file → Normal file
View File

0
init_config.sh → scripts/init_config.sh Executable file → Normal file
View File

0
migrate-lang.sh → scripts/migrate-lang.sh Executable file → Normal file
View File

0
modules_scan.sh → scripts/modules_scan.sh Executable file → Normal file
View File

99
scripts/update_modules.py Normal file
View File

@ -0,0 +1,99 @@
import importlib.util
import argparse
import os
import subprocess
import sys
from ..services import config as Config
from ..lib import color_log
def run_command(cmd, description):
"""Run a command and handle its output."""
try:
color_log.Show("INFO", f"Executing: {description}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
print(result.stdout.strip())
if result.stderr:
print(result.stderr.strip())
return result.returncode == 0
except subprocess.CalledProcessError as e:
color_log.Show("FAILED", f"Error executing {description}: {str(e)}")
return False
def update_instance(instance_name, action, force_pull=False):
"""Update a single instance with git pull, module update, and service restart."""
# 1. Pull latest code
pull_cmd = ["python", "utility/main.py", "git", "pull", instance_name]
if force_pull:
pull_cmd.append("--force")
if not run_command(pull_cmd, f"Pulling latest code for {instance_name}"):
color_log.Show("WARNING", f"Skipping module update for {instance_name} due to pull failure")
return False
# 2. Update modules
module_cmd = ["python", "utility/main.py", "module", action, instance_name]
if not run_command(module_cmd, f"Updating modules for {instance_name}"):
color_log.Show("WARNING", f"Module update failed for {instance_name}")
return False
# 3. Restart service
restart_cmd = ["python", "utility/main.py", "service", "restart", instance_name]
if not run_command(restart_cmd, f"Restarting service for {instance_name}"):
color_log.Show("WARNING", f"Service restart failed for {instance_name}")
return False
return True
def main():
# Parse arguments
parser = argparse.ArgumentParser(description="Update modules for all instances")
parser.add_argument(
"action",
help="Action to perform",
choices=["uninstall", "install", "upgrade"]
)
parser.add_argument(
"config_path",
help="Path to the config file"
)
parser.add_argument(
"--force-pull",
action="store_true",
help="Force pull with hard reset (discards local changes)"
)
args = parser.parse_args()
# Load the configuration
color_log.Show("INFO", f"Loading configuration from {args.config_path}")
config = Config.Config(config_path=args.config_path)
# Get instances
instances = config.get_instances()
if not isinstance(instances, list):
color_log.Show("FAILED", "Error: instances is not a valid list.")
sys.exit(1)
# Process each instance
success_count = 0
for instance in instances:
if "name" not in instance:
color_log.Show("WARNING", f"Instance missing 'name' field. Skipping.")
continue
instance_name = instance["name"]
color_log.Show("INFO", f"\nProcessing instance: {instance_name}")
if update_instance(instance_name, args.action, args.force_pull):
success_count += 1
color_log.Show("OK", f"Successfully updated {instance_name}")
else:
color_log.Show("FAILED", f"Failed to update {instance_name}")
# Summary
color_log.Show("INFO", f"\nUpdate Summary:")
color_log.Show("INFO", f"Total instances: {len(instances)}")
color_log.Show("INFO", f"Successful updates: {success_count}")
color_log.Show("INFO", f"Failed updates: {len(instances) - success_count}")
if __name__ == "__main__":
main()

0
services/__init__.py Normal file
View File

37
services/config.py Normal file
View File

@ -0,0 +1,37 @@
import os
import yaml
class Config:
def __init__(self, config_path="config/settings.yaml"):
self.config_path = config_path
self.settings = self.load_config()
def load_config(self):
if not os.path.exists(self.config_path):
raise FileNotFoundError(f"Config file not found at {self.config_path}")
with open(self.config_path, "r") as f:
return yaml.safe_load(f)
def get(self, section, key, default=None):
return self.settings.get(section, {}).get(key, default)
def get_instances(self):
"""Return the list of Odoo instances."""
return self.settings.get("odoo_instances", [])
def get_instance(self, name):
"""
Get a single instance configuration by name.
Args:
name (str): Name of the instance to retrieve
Returns:
dict: Instance configuration if found, None otherwise
"""
instances = self.get_instances()
for instance in instances:
if instance.get("name") == name:
return instance
return None

0
services/git/__init__.py Normal file
View File

233
services/git/handler.py Normal file
View File

@ -0,0 +1,233 @@
import git
import os
from services.odoo.connection import OdooConnection
import lib.color_log as color_log
import subprocess
from urllib.parse import urlparse
class GitHandler:
def __init__(self, config_path="config/settings.yaml"):
self.config = OdooConnection(config_path)
self.local_path = None # Will be set based on instance configuration
def _execute_command(self, cmd, instance_name):
"""Execute a shell command and handle errors."""
try:
color_log.Show("INFO", f"Executing command: {cmd}")
result = subprocess.run(
cmd, shell=True, check=True, capture_output=True, text=True
)
# Print the output if there is any
if result.stdout:
print(result.stdout.strip())
if result.stderr:
print(result.stderr.strip())
color_log.Show("OK", f"Command executed successfully for {instance_name}")
return result.stdout
except subprocess.CalledProcessError as e:
# Print the error output
if e.stdout:
print(e.stdout.strip())
if e.stderr:
print(e.stderr.strip())
color_log.Show(
"FAILED",
f"Error performing git operation for {instance_name}: {e}",
)
raise
def _get_auth_url(self, repo_url, instance):
"""Add authentication to repository URL if credentials are provided."""
if not repo_url:
return repo_url
git_config = instance.get("git", {})
username = git_config.get("username")
password = git_config.get("password")
if username and password:
parsed_url = urlparse(repo_url)
# Replace the URL with authenticated version
auth_url = f"{parsed_url.scheme}://{username}:{password}@{parsed_url.netloc}{parsed_url.path}"
return auth_url
return repo_url
def _get_remote_command(self, instance, cmd):
"""Generate SSH command for remote execution."""
host = instance["host"]
ssh_settings = instance.get("ssh", {})
ssh_user = ssh_settings.get("user", "root")
ssh_key_path = ssh_settings.get("key_path")
local_host = host in ["localhost", "127.0.0.1"]
if not local_host:
if not ssh_key_path:
return f"ssh -t {ssh_user}@{host} 'sudo -s bash -c \"{cmd}\"'"
else:
return f"ssh -i {ssh_key_path} {ssh_user}@{host} 'sudo -s bash -c \"{cmd}\"'"
return cmd
def clone_or_open_repo(self, instance_name=None, repo_url=None, branch=None):
"""Clone or open repository with SSH support"""
try:
if not instance_name:
# Local operation
if not os.path.exists(self.local_path):
cmd = (
f"git clone -b {branch or 'main'} {repo_url} {self.local_path}"
)
self._execute_command(cmd, "local")
return True
# Remote operation
instance = self.config.get_instance(instance_name)
if not instance:
raise ValueError(f"Instance {instance_name} not found")
# Set local_path from instance configuration
self.local_path = instance.get("git", {}).get("local_path")
if not self.local_path:
raise ValueError(
f"No local_path configured for instance {instance_name}"
)
# Get repository URL from instance configuration if not provided
if not repo_url:
repo_url = instance.get("git", {}).get("repo_url")
if not repo_url:
raise ValueError(
f"No repository URL configured for instance {instance_name}"
)
# Add authentication to repository URL
auth_url = self._get_auth_url(repo_url, instance)
# Check if repo exists
check_cmd = (
f"test -d {self.local_path}/.git && echo 'exists' || echo 'not exists'"
)
remote_check_cmd = self._get_remote_command(instance, check_cmd)
result = self._execute_command(remote_check_cmd, instance_name)
if "not exists" in result:
# Clone repository with authentication
clone_cmd = (
f"git clone -b {branch or 'main'} {auth_url} {self.local_path}"
)
remote_clone_cmd = self._get_remote_command(instance, clone_cmd)
self._execute_command(remote_clone_cmd, instance_name)
return True
except Exception as e:
color_log.Show("FAILED", f"Error in clone_or_open_repo: {e}")
return False
def pull_updates(self, instance_name=None, branch=None, force=False):
"""Pull updates with SSH support
Args:
instance_name (str, optional): Name of the instance
branch (str, optional): Branch to pull from
force (bool, optional): If True, will perform hard reset before pulling
"""
try:
if not instance_name:
# Local operation
if not self.local_path:
raise ValueError("local_path not set for local operation")
cmd = f"git --git-dir={self.local_path}/.git --work-tree={self.local_path} pull origin {branch or 'main'}"
self._execute_command(cmd, "local")
return True
# Remote operation
instance = self.config.get_instance(instance_name)
branch = instance.get("git", {}).get("branch") or branch
if not instance:
raise ValueError(f"Instance {instance_name} not found")
# Set local_path from instance configuration
self.local_path = instance.get("git", {}).get("local_path")
if not self.local_path:
raise ValueError(
f"No local_path configured for instance {instance_name}"
)
# Get repository URL and add authentication
repo_url = instance.get("git", {}).get("repo_url")
auth_url = self._get_auth_url(repo_url, instance)
# Configure git to use credentials and set remote URL
git_commands = [
f"cd {self.local_path}",
f"git config --local credential.helper store",
f"git remote set-url origin {auth_url}",
]
# Add force reset commands if force is True
if force:
git_commands.extend(
[
"git fetch origin",
f"git reset --hard origin/{branch or 'main'}",
"git clean -fd", # Remove untracked files and directories
]
)
else:
git_commands.append(f"git pull origin {branch or 'main'}")
# Combine commands and execute remotely
combined_cmd = " && ".join(git_commands)
remote_cmd = self._get_remote_command(instance, combined_cmd)
self._execute_command(remote_cmd, instance_name)
return True
except Exception as e:
color_log.Show("FAILED", f"Error pulling updates: {e}")
return False
def get_current_commit(self):
return self.repo.head.commit.hexsha if self.repo else None
def _get_command(self, instance, action, repo_url=None, branch=None):
"""
Generate the appropriate git command based on instance type and action.
Args:
instance (dict): Instance configuration
action (str): Git action (clone/pull)
repo_url (str, optional): Repository URL for clone operation
branch (str, optional): Branch name
Returns:
str: Generated git command
"""
host = instance["host"]
ssh_settings = instance.get("ssh", {})
ssh_user = ssh_settings.get("user", "root")
ssh_key_path = ssh_settings.get("key_path")
local_host = host in ["localhost", "127.0.0.1"]
# Base git command
if action == "clone":
if not repo_url:
raise ValueError("Repository URL is required for clone operation")
cmd = f"git clone -b {branch or 'main'} {repo_url} {self.local_path}"
elif action == "pull":
cmd = f"git --git-dir={self.local_path}/.git --work-tree={self.local_path} pull origin {branch or 'main'}"
else:
raise ValueError(f"Unsupported git action: {action}")
# Wrap with SSH if remote host
if not local_host:
if not ssh_key_path:
cmd = f"ssh -t {ssh_user}@{host} 'sudo {cmd}'"
else:
cmd = f"ssh -i {ssh_key_path} {ssh_user}@{host} 'sudo {cmd}'"
return cmd

View File

View File

@ -0,0 +1,99 @@
from odoorpc import ODOO
import os
import sys
from services.config import Config
import lib.color_log as color_log
class OdooConnection:
def __init__(self, config_path="config/settings.yaml", instance_name=None):
"""Initialize with a config file and optional instance name."""
self.config = Config(config_path)
self.instance_name = instance_name
self.instances = self._load_instances()
self.connections = {} # Store active Odoo connections
def _load_instances(self):
"""Load Odoo instances from the config."""
instances = self.config.get_instances()
if not instances:
raise ValueError("No Odoo instances found in the configuration.")
if self.instance_name:
# Filter for the specified instance
filtered = [i for i in instances if i["name"] == self.instance_name]
if not filtered:
raise ValueError(
f"Instance '{self.instance_name}' not found in configuration."
)
return filtered
return instances
def connect(self, instance_name=None):
"""Connect to a specific instance or all instances if none specified."""
target_instances = (
[i for i in self.instances if i["name"] == instance_name]
if instance_name
else self.instances
)
for instance in target_instances:
if instance["name"] in self.connections:
print(f"Using existing connection for {instance['name']}")
continue
try:
# Create Odoo RPC connection
odoo = ODOO(instance["host"], port=instance["port"])
odoo.login(
instance["database"], instance["username"], instance["password"]
)
self.connections[instance["name"]] = odoo
color_log.Show(
"OK",
f"Connected to {instance['name']} at {instance['host']}:{instance['port']}",
)
except Exception as e:
color_log.Show(
"FAILED",
f"Failed to connect to {instance['name']} at {instance['host']}:{instance['port']}: {e}",
)
raise
def get_connection(self, instance_name):
"""Get an existing connection for an instance, connecting if necessary."""
if instance_name not in self.connections:
self.connect(instance_name)
return self.connections.get(instance_name)
def disconnect(self, instance_name=None):
"""Disconnect from a specific instance or all instances."""
target_instances = [instance_name] if instance_name else self.connections.keys()
for name in target_instances:
if name in self.connections:
# odoorpc doesn't have an explicit disconnect, so we just remove the reference
del self.connections[name]
color_log.Show("OK", f"Disconnected from {name}")
def get_instances(self):
"""Return the list of configured instances."""
return self.instances
def get_instance(self,instance_name):
"""Return the instance configuration."""
return self.config.get_instance(instance_name)
def execute(self, instance_name, model, method, *args, **kwargs):
"""Execute a method on a model for a specific instance."""
odoo = self.get_connection(instance_name)
if not odoo:
raise ValueError(f"No connection available for instance '{instance_name}'")
try:
model_obj = odoo.env[model]
result = getattr(model_obj, method)(*args, **kwargs)
return result
except Exception as e:
color_log.Show(
"FAILED",
f"Error executing {method} on {model} for {instance_name}: {e}",
)
raise

36
services/odoo/database.py Normal file
View File

@ -0,0 +1,36 @@
from services.odoo.connection import OdooConnection
class OdooDatabaseManager:
def __init__(self, config_path: str = "config/settings.yaml"):
self.config = OdooConnection(config_path)
def get_databases(self, instance_name: str = None) -> list:
"""Get a list of databases for a specific Odoo instance."""
print("Fetching databases...")
def drop_database(self, instance_name: str, db_name: str) -> None:
"""Drop a specific database from the Odoo instance."""
print(f"Dropping database {db_name} from instance {instance_name}...")
def create_database(
self, instance_name: str, db_name: str, demo: bool = False
) -> None:
"""Create a new database for the Odoo instance."""
print(f"Creating database {db_name} for instance {instance_name}...")
def backup_database(self, instance_name: str, db_name: str) -> None:
"""Backup a specific database from the Odoo instance."""
print(f"Backing up database {db_name} from instance {instance_name}...")
def restore_database(
self, instance_name: str, db_name: str, backup_file: str
) -> None:
"""Restore a database from a backup file."""
print(f"Restoring database {db_name} for instance {instance_name}...")
def duplicate_database(
self, instance_name: str, source_db: str, target_db: str
) -> None:
"""Duplicate a database in the Odoo instance."""
print(f"Duplicating database {source_db} to {target_db}...")

113
services/odoo/module.py Normal file
View File

@ -0,0 +1,113 @@
from services.git.handler import GitHandler
from services.odoo.connection import OdooConnection
import lib.color_log as color_log
class OdooModuleManager:
def __init__(self, config_path: str = "config/settings.yaml"):
self.config = OdooConnection(config_path)
def _manage_module(
self, action: str, instance_name: str = None, module_names: list = None
) -> None:
"""Generic method to install, uninstall, or upgrade modules."""
if action not in {"install", "uninstall", "upgrade"}:
raise ValueError(f"Invalid action: {action}")
self.config.connect(instance_name)
for instance in self.config.get_instances():
if instance_name and instance["name"] != instance_name:
continue
color_log.Show("INFO", f"Processing instance: {instance['name']}")
for module_name in module_names:
try:
color_log.Show(
"INFO",
f"{action.capitalize()}ing module: {module_name} in {instance['name']}",
)
module_ids = self.config.execute(
instance["name"],
"ir.module.module",
"search",
[("name", "=", module_name)],
)
if not module_ids and action in ["upgrade", "uninstall"]:
color_log.Show(
"WARNING",
f"Module {module_name} not found in {instance['name']}, skipping.",
)
continue
button_action = f"button_immediate_{action}"
self.config.execute(
instance["name"],
"ir.module.module",
button_action,
module_ids, # Pass list directly instead of wrapping in another list
)
color_log.Show(
"OK",
f"Module {module_name} {action}ed successfully in {instance['name']}",
)
except Exception as e:
color_log.Show(
"FAILED",
f"Error while {action}ing {module_name} in {instance['name']}: {e}",
)
def get_modules(self, instance_name: str = None) -> list:
"""Get a list of installed modules for the specified instance(s)."""
self.config.connect(instance_name)
modules = []
for instance in self.config.get_instances():
if instance_name and instance["name"] != instance_name:
continue
color_log.Show("INFO", f"Fetching modules for instance: {instance['name']}")
if not instance.get("modules"):
color_log.Show(
"WARNING",
f"No modules found for instance {instance['name']}, skipping.",
)
continue
modules.extend(instance["modules"])
return modules
def install(self, instance_name: str = None, module_names: list = None) -> None:
"""Install multiple modules for the specified instance(s)."""
self._manage_module("install", instance_name, module_names)
def uninstall(self, instance_name: str = None, module_names: list = None) -> None:
"""Uninstall multiple modules for the specified instance(s)."""
self._manage_module("uninstall", instance_name, module_names)
def upgrade(self, instance_name: str = None, module_names: list = None) -> None:
"""Upgrade multiple modules for the specified instance(s)."""
self._manage_module("upgrade", instance_name, module_names)
def is_module_installed(self, instance_name: str, module_name: str) -> bool:
"""Check if a module is installed in the specified instance.
Args:
instance_name (str): Name of the Odoo instance
module_name (str): Name of the module to check
Returns:
bool: True if the module is installed, False otherwise
"""
self.config.connect(instance_name)
try:
module_ids = self.config.execute(
instance_name,
"ir.module.module",
"search",
[("name", "=", module_name), ("state", "=", "installed")],
)
return bool(module_ids)
except Exception as e:
color_log.Show(
"FAILED",
f"Error checking module {module_name} in {instance_name}: {e}",
)
return False

87
services/odoo/service.py Normal file
View File

@ -0,0 +1,87 @@
import subprocess
from services.odoo.connection import OdooConnection
import lib.color_log as color_log
class OdooServiceManager:
def __init__(self, config_path="config/settings.yaml"):
self.config = OdooConnection(config_path)
def _execute_command(self, cmd, instance_name):
"""Execute a shell command and handle errors."""
try:
color_log.Show("INFO", f"Executing command: {cmd}")
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
color_log.Show("OK", f"Command executed successfully for {instance_name}")
except subprocess.CalledProcessError as e:
color_log.Show(
"FAILED",
f"Error performing service operation for {instance_name}: {e}",
)
raise
def _get_command(self, instance, action):
"""
Generate the appropriate command based on instance type and action (stop/restart).
"""
service_type = instance.get("type", "systemctl")
host = instance["host"]
service_name = instance.get("service_name", f"odoo_{instance['name']}")
ssh_settings = instance.get("ssh", {})
ssh_user = ssh_settings.get("user", "root")
ssh_key_path = ssh_settings.get("key_path")
local_host = host in ["localhost", "127.0.0.1"]
if service_type == "systemctl":
cmd = f"systemctl {action} {service_name}"
elif service_type == "docker":
container_name = instance.get("container_name", f"odoo_{instance['name']}")
cmd = f"docker {action} {container_name}"
else:
color_log.Show(
"WARNING",
f"Unsupported service type '{service_type}' for {instance['name']}",
)
return None
if not local_host:
if not ssh_key_path:
cmd = f"ssh -t {ssh_user}@{host} 'sudo {cmd}'"
else:
cmd = f"ssh -i {ssh_key_path} {ssh_user}@{host} 'sudo {cmd}'"
return cmd
def manage_service(self, action, instance_name=None):
"""
Manage the Odoo service (stop or restart) for given instances.
:param action: "stop" or "restart"
:param instance_name: Specific instance name, or None for all instances.
"""
if action not in ["stop", "restart", "start"]:
raise ValueError("Action must be 'stop' or 'restart'.")
for instance in self.config.get_instances():
if instance_name and instance["name"] != instance_name:
continue
color_log.Show(
"INFO",
f"{action.capitalize()}ing service for instance: {instance['name']}",
)
cmd = self._get_command(instance, action)
if cmd:
self._execute_command(cmd, instance["name"])
def stop_service(self, instance_name=None):
"""Stop the Odoo service based on the instance type"""
self.manage_service("stop", instance_name)
def restart_service(self, instance_name=None):
"""Restart the Odoo service based on the instance type"""
self.manage_service("restart", instance_name)
def start_service(self, instance_name=None):
"""Start the Odoo service based on the instance type"""
self.manage_service("start", instance_name)

View File

@ -1,3 +0,0 @@
#!/usr/bin/bash
set +x
sed -i "s/TAG := \$(shell rev-parse --abbrev-ref HEAD)/TAG := $1/g" Makefile

View File

@ -1,92 +0,0 @@
#!/usr/bin/env python
"""Install/update/uninstall specified odoo module."""
import odoorpc
import argparse
USER = "admin"
PASSWORD = "admin"
HOST = "localhost"
PORT = "8069"
DB = "odoodb"
def prepare_args():
"""Prepare arguments for module action RPC call."""
parser = argparse.ArgumentParser(
description="Run modules install, upgrade or uninstall."
)
parser.add_argument(
"-i",
"--install",
help="Comma separated list of modules to install",
)
parser.add_argument(
"-u",
"--upgrade",
help="Comma separated list of modules to upgrade",
)
parser.add_argument(
"-del",
"--delete",
help="Comma separated list of modules to uninstall",
)
parser.add_argument(
"--user",
help="User to log in with",
default=USER,
)
parser.add_argument(
"--password",
help="Password to log in with",
default=PASSWORD,
)
parser.add_argument(
"--host",
help="Host to log in to",
default=HOST,
)
parser.add_argument(
"--port",
help="Odoo port",
default=PORT,
)
parser.add_argument(
"-d",
"--database",
help="Database name to log in to",
default=DB,
)
return parser.parse_args()
def login(user, password, host, port, database):
"""Login to Odoo database and return connection object."""
odoo = odoorpc.ODOO(host, port=port)
odoo.login(database, user, password)
return odoo
def _find_modules(env, module_names):
IrModuleModule = env["ir.module.module"]
modules = module_names.replace(" ", "").split(",")
module_ids = IrModuleModule.search([("name", "in", modules)])
return IrModuleModule.browse(module_ids)
def trigger_action(env, module_names, action):
modules = _find_modules(env, module_names)
method = getattr(modules, f"button_immediate_{action}")
return method()
if __name__ == "__main__":
args = prepare_args()
odoo = login(args.user, args.password, args.host, args.port, args.database)
env = odoo.env
if args.install:
trigger_action(env, args.install, "install")
if args.upgrade:
trigger_action(env, args.upgrade, "upgrade")
if args.delete:
trigger_action(env, args.delete, "uninstall")

25
utility.mk Normal file
View File

@ -0,0 +1,25 @@
UTILITY_DIR =${PWD}/utility
SCRIPTS_DIR = ${UTILITY_DIR}/scripts
UTILITY_CONFIG = ${PWD}/utility/config/settings.yaml
update_modules_all:
${PYTHON} -m utility.scripts.update_modules upgrade ${UTILITY_CONFIG}
cleanup_addons:
@bash ${SCRIPT_PATH}/clean_up_addons.sh $(shell echo $(ADDONS) | tr ',' ' ')
gen_config:
${SCRIPT_PATH}/init_config.sh --native ${ADDONS} ${DB_USER} ${DB_PASSWORD} ${DB_SERVER} ${DB_PORT}
download_backup:
@if [ -z "${LINKDB}" ]; then \
LinkDB=${BACKUP}; \
read -p "LinkDownload [default: ${BACKUP}]: " InputLinkDB; \
LinkDB=$${InputLinkDB:-${BACKUP}}; \
else \
LinkDB=${LINKDB}; \
fi; \
Filename=$$(basename $$LinkDB); \
echo "Downloading $$Filename from: $$LinkDB"; \
${SCRIPT_PATH}/download_backup.sh $$LinkDB backup.zip
gen_config_docker:
${SCRIPT_PATH}/init_config.sh --docker ${ODOO_IMAGE} ${TAG} ${CONTAINER_ID}
update_tag:
${SCRIPT_PATH}/update_tag.sh $(CURR_BRANCH)