This commit is contained in:
KaySar12 2025-04-03 09:46:03 +07:00
parent 20dd9cf791
commit 9a1c76e1af
18 changed files with 2145 additions and 0 deletions

103
backup_db.py Normal file
View File

@ -0,0 +1,103 @@
#!/usr/bin/env python
import shutil
import odoorpc
import color_log
import argparse
import sys
import base64
import os
from datetime import datetime
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
BACKUP_DIR = "odoo_backups"
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
# odoo.login(args.db_name, args.username, args.password)
# color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(description="Backup all Odoo databases.")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument(
"--admin-password", required=True, help="Odoo master admin password"
)
parser.add_argument(
"--database",
nargs="*",
help="Specific databases to backup (leave empty to backup all databases)",
)
return parser.parse_args()
def backup_database(odoo: odoorpc.ODOO, db_name: str, admin_password: str):
"""Backup a single Odoo database."""
date_str = datetime.now().strftime("%m-%d-%Y")
try:
print(f"Backing up database: {db_name}...")
timeout_backup = odoo.config["timeout"]
odoo.config["timeout"] = 600 # Timeout set to 10 minutes
backup_data = odoo.db.dump(admin_password, db_name)
odoo.config["timeout"] = timeout_backup
os.makedirs(BACKUP_DIR, exist_ok=True)
backup_path = os.path.join(BACKUP_DIR, f"{db_name}-{date_str}.zip")
# Ensure BytesIO object is written correctly
with open(backup_path, "wb") as f:
f.write(backup_data.read())
print(f"Backup saved: {backup_path}")
except Exception as e:
print(f"Failed to backup {db_name}: {e}")
def backup_all_databases(odoo: odoorpc.ODOO, admin_password: str):
"""Backup all available databases."""
try:
db_list = odoo.db.list()
print("Databases found:", db_list)
for db_name in db_list:
backup_database(odoo, db_name, admin_password)
except Exception as e:
print(f"Error retrieving database list: {e}")
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
if args.database:
for db_name in args.database:
backup_database(odoo, db_name, args.admin_password)
else:
backup_all_databases(odoo, args.admin_password)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)

101
clean_up_addons.sh Executable file
View File

@ -0,0 +1,101 @@
#!/bin/bash
# Check if at least one root folder is provided as an argument
if [ $# -eq 0 ]; then
echo "Usage: $0 <root_folder> [<root_folder>...]"
echo "Please provide at least one root folder path."
exit 1
fi
# Define the protected items list file
PROTECTED_LIST="protected.txt"
if [ ! -f "$PROTECTED_LIST" ]; then
echo "Error: '$PROTECTED_LIST' not found."
echo "Please create 'protected.txt' one directory up with a list of protected files/folders (one per line)."
exit 1
fi
# Safeguard: Check if any file/folder matching patterns in protected.txt exists in a root folder
check_protected_items() {
local root_dir="$1"
while IFS= read -r pattern; do
# Skip empty lines
[ -z "$pattern" ] && continue
# Handle wildcards by using find for pattern matching
if [[ "$pattern" == *"*"* ]]; then
# Convert pattern to a find-compatible search
if [[ "$pattern" == /* ]]; then
search_path="${root_dir}${pattern}"
else
search_path="${root_dir}/${pattern}"
fi
# Use find to check if any files match the pattern
if find "$root_dir" -path "$search_path" 2>/dev/null | grep -q .; then
echo "Error: Protected pattern '$pattern' matches files in '$root_dir'. Aborting execution."
exit 1
fi
else
# Exact match for non-wildcard entries
if [ -e "$root_dir/$pattern" ]; then
echo "Error: Protected item '$pattern' found in '$root_dir'. Aborting execution."
exit 1
fi
fi
done < "$PROTECTED_LIST"
}
# Function to check and delete subfolders
delete_non_manifest_folders() {
local dir="$1"
# Loop through all immediate subdirectories in the given directory
for subfolder in "$dir"/*/ ; do
# Check if it's a directory
if [ -d "$subfolder" ]; then
# Check if __manifest__.py exists in this subfolder
if [ ! -f "$subfolder/__manifest__.py" ]; then
echo "Deleting '$subfolder' (no __manifest__.py found)"
rm -rf "$subfolder"
else
echo "Keeping '$subfolder' (__manifest__.py found)"
fi
fi
done
}
# Process each root folder provided as an argument
for ROOT_FOLDER in "$@"; do
# Check if the root folder exists and is a directory
if [ ! -d "$ROOT_FOLDER" ]; then
echo "Error: '$ROOT_FOLDER' is not a valid directory. Skipping."
continue
fi
# Perform the safeguard check for this root folder
echo "Checking for protected items in '$ROOT_FOLDER' from '$PROTECTED_LIST'..."
check_protected_items "$ROOT_FOLDER"
# Change to the root folder to handle relative paths cleanly
cd "$ROOT_FOLDER" || {
echo "Error: Could not change to directory '$ROOT_FOLDER'. Skipping."
continue
}
# Call the function with the current root folder
echo "Processing '$ROOT_FOLDER'..."
delete_non_manifest_folders "."
# Return to the original directory to process the next root folder
cd - > /dev/null || {
echo "Error: Could not return from '$ROOT_FOLDER'. Exiting."
exit 1
}
echo "Cleanup complete for '$ROOT_FOLDER'!"
done
echo "All root folders processed!"
exit 0

27
clean_up_virtualenvs.sh Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
# Get the current branch name
branch_name=$(git rev-parse --abbrev-ref HEAD)
# Get a list of all virtual environments, filtering out duplicates and those not containing the branch name
virtualenvs=$(pyenv virtualenvs | awk '{print $1}' | sort -u | grep "$branch_name")
# Count the number of virtual environments
count=$(echo "$virtualenvs" | wc -l)
# Calculate how many virtual environments to keep
keep_count=$((count - $1))
# If there are more than 3 virtual environments, delete the oldest ones
if (( keep_count > 0 )); then
# Get the oldest virtual environments (assuming they are listed first)
oldest_venvs=$(echo "$virtualenvs" | head -n "$keep_count")
# Loop through the oldest virtual environments and delete them
for venv in $oldest_venvs; do
echo "Deleting virtual environment: $venv"
pyenv virtualenv-delete -f "$venv"
done
fi
echo "Old virtual environments containing '$branch_name' deleted."

42
color_log.py Normal file
View File

@ -0,0 +1,42 @@
#!/usr/bin/env python3
def colorize(text, code):
"""Colorizes text using ANSI escape codes."""
return f"\033[{code}m{text}\033[0m"
def Show(status, message):
"""Displays a message with a status indicator."""
colors = {
0: (
colorize("[", "90") + colorize(" OK ", "38;5;154") + colorize("]", "90")
), # Green, Grey
1: (
colorize("[", "90") + colorize(" FAILED ", "91") + colorize("]", "90")
), # Red, Grey
2: (
colorize("[", "90") + colorize(" INFO ", "38;5;154") + colorize("]", "90")
), # Green, Grey
3: (
colorize("[", "90") + colorize(" WARNING ", "33") + colorize("]", "90")
), # Yellow, Grey
}
print(f"{colors.get(status, '')} {message}")
if status == 1:
exit(1)
def Warn(message):
"""Displays a warning message in red."""
print(colorize(message, "91"))
def GreyStart():
"""Starts a grey-colored output."""
print(colorize("", "90"), end="")
def ColorReset():
"""Resets the output color."""
print("\033[0m", end="")

10
dir2file.sh Executable file
View File

@ -0,0 +1,10 @@
#!/bin/bash
# Define output file name
input_dir="$1"
output_file="$2"
# Find all directories in root and write to file
# Using find to list only directories (-type d) at depth 1 (-maxdepth 1)
find $input_dir -maxdepth 1 -type d -not -path "$input_dir" -exec basename {} \; | sort >> "$output_file"
echo "Folder list has been written to $output_file"

82
download_backup.sh Executable file
View File

@ -0,0 +1,82 @@
#!/usr/bin/bash
export PATH=/usr/sbin:$PATH
export DEBIAN_FRONTEND=noninteractive
set -euo pipefail
readonly COLOUR_RESET='\e[0m'
readonly aCOLOUR=(
'\e[38;5;154m' # green | Lines, bullets and separators
'\e[1m' # Bold white | Main descriptions
'\e[90m' # Grey | Credits
'\e[91m' # Red | Update notifications Alert
'\e[33m' # Yellow | Emphasis
)
trap 'onCtrlC' INT
onCtrlC() {
echo -e "${COLOUR_RESET}"
exit 1
}
Show() {
# OK
if (($1 == 0)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[0]} OK $COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
# FAILED
elif (($1 == 1)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[3]}FAILED$COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
exit 1
# INFO
elif (($1 == 2)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[0]} INFO $COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
# NOTICE
elif (($1 == 3)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[4]}NOTICE$COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
fi
}
Warn() {
echo -e "${aCOLOUR[3]}$1$COLOUR_RESET"
}
GreyStart() {
echo -e "${aCOLOUR[2]}\c"
}
ColorReset() {
echo -e "$COLOUR_RESET\c"
}
main() {
DEPLOYMENT_DIR=$(pwd)/deployment
BACKUP_DIR="$DEPLOYMENT_DIR/backup"
DOWNLOAD_URL="$1"
BACKUP_FILE="$BACKUP_DIR/$2"
# Check if the deployment and backup directories exist, create them if not
if [[ ! -d "$BACKUP_DIR" ]]; then
echo "Backup directory does not exist. Creating: $BACKUP_DIR"
mkdir -p "$BACKUP_DIR"
fi
# Check if the download URL is valid
echo "Checking if the URL is valid: $DOWNLOAD_URL"
if curl --head --silent --fail "$DOWNLOAD_URL" > /dev/null; then
echo "URL is valid. Proceeding with download..."
else
Show 1 "Error: Invalid or inaccessible URL: $DOWNLOAD_URL"
exit 1
fi
# Download the file and rename it to backup.zip
wget -O "$BACKUP_FILE" "$DOWNLOAD_URL"
# Check if the file was downloaded
if [[ -f "$BACKUP_FILE" ]]; then
Show 0 "Backup file successfully downloaded to: $BACKUP_FILE"
else
Show 1 "Error: Backup file was not downloaded."
exit 1
fi
}
main "$@"

67
gen_config.py Normal file
View File

@ -0,0 +1,67 @@
#!/usr/bin/env python3
import argparse
import configparser
import shutil
import os
from dotenv import set_key
from pathlib import Path
import socket
import secrets
import string
import color_log
def find_available_port(start_port=80):
"""Finds an available port starting from the given port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
while True:
try:
sock.bind(('0.0.0.0', start_port))
color_log.Show(3,f" {start_port} is Open")
return start_port
except OSError as e:
if e.errno == 98: # Address already in use
print(f"{start_port} already in use , Try other port ...")
start_port += 1
else:
raise
def main():
"""
Generates a random password and finds an available port.
Updates the Odoo configuration file and .env file with these values.
"""
parser = argparse.ArgumentParser(description="Generate Odoo configuration")
parser.add_argument('--db_user', type=str, help='')
parser.add_argument('--db_pass', type=str, help='')
parser.add_argument('--deploy_path', type=str, help='')
parser.add_argument('--addons_path', type=str, help='')
# parser.add_argument('--db_filter', type=str, help='')
parser.add_argument('--db_port', type=int, help='')
parser.add_argument('--db_server', type=str, help='')
args = parser.parse_args()
db_port = args.db_port
db_user = args.db_user
db_pass = args.db_pass
db_server = args.db_server
app_port = find_available_port(8069)
addons_path = args.addons_path
base_dir= args.deploy_path
# db_filter= args.db_filter
# Copy template files
os.makedirs(f"{base_dir}/etc", exist_ok=True)
color_log.Show(3,f"Copy {base_dir}/odoo.conf.template to {base_dir}/etc/odoo.conf")
shutil.copyfile(f'{base_dir}/odoo.conf.template', f'{base_dir}/odoo.conf')
# Update Odoo configuration file
config = configparser.ConfigParser()
config.read(f'{base_dir}/odoo.conf')
config['options']['db_host'] = str(db_server)
config['options']['db_user'] = db_user
config['options']['db_password'] = db_pass
config['options']['db_port'] = str(db_port)
config['options']['addons_path'] = addons_path
config['options']['xmlrpc_port'] = str(app_port)
config['options']['dbfilter'] = ".*"
config['options']['proxy_mode'] = "True"
with open(f'{base_dir}/odoo.conf', 'w') as configfile:
config.write(configfile)
if __name__ == "__main__":
main()

93
gen_config_docker.py Executable file
View File

@ -0,0 +1,93 @@
#!/usr/bin/env python3
import argparse
import configparser
import shutil
import os
from dotenv import set_key
from pathlib import Path
import socket
import secrets
import string
import color_log
def generate_password(length=16):
"""Generates a random password of specified length."""
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(length))
def find_available_port(start_port=80):
"""Finds an available port starting from the given port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
while True:
try:
sock.bind(('0.0.0.0', start_port))
color_log.Show(3,f" {start_port} is Open")
return start_port
except OSError as e:
if e.errno == 98: # Address already in use
print(f"{start_port} already in use , Try other port ...")
start_port += 1
else:
raise
def main():
"""
Generates a random password and finds an available port.
Updates the Odoo configuration file and .env file with these values.
"""
parser = argparse.ArgumentParser(description="Generate Odoo configuration")
parser.add_argument('--db_port', type=int, help='')
parser.add_argument('--db_user', type=str, help='')
parser.add_argument('--deploy_path', type=str, help='')
parser.add_argument('--image', type=str, help='')
parser.add_argument('--tag', type=str, help='')
parser.add_argument('--addons', type=str, help='')
parser.add_argument('--config', type=str, help='')
parser.add_argument('--container', type=str, help='')
parser.add_argument('--backup', type=str, help='')
args = parser.parse_args()
db_port = args.db_port
db_pass = "smartyourlife"
db_user = args.db_user
base_dir= args.deploy_path
image=args.image
tag=args.tag
container=args.container
addons=args.addons
config_path=args.config
app_port = 10017
backup = args.backup
# Copy template files
os.makedirs(f"{base_dir}/etc", exist_ok=True)
color_log.Show(3,f"Copy {base_dir}/odoo.conf.template to {base_dir}/etc/odoo.conf")
shutil.copyfile(f'{base_dir}/odoo.conf.template', f'{base_dir}/etc/odoo.conf')
shutil.copyfile(f'{base_dir}/env.template', f'{base_dir}/.env')
# Update Odoo configuration file
config = configparser.ConfigParser()
config.read(f'{base_dir}/etc/odoo.conf')
config['options']['db_host'] = "db"
config['options']['db_user'] = db_user
config['options']['db_password'] = db_pass
config['options']['db_port'] = str(db_port)
config['options']['addons_path'] = "/mnt/extra-addons"
config['options']['data_dir'] = "/var/lib/odoo"
config['options']['proxy_mode'] = "True"
with open(f'{base_dir}/etc/odoo.conf', 'w') as configfile:
config.write(configfile)
# Update .env file
env_file_path = Path("deployment/.env")
set_key(dotenv_path=env_file_path, key_to_set="COMPOSE_PROJECT_NAME", value_to_set=f"odoo-{tag}",quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="PG_PORT", value_to_set=find_available_port(int(os.getenv('DB_PORT','5432'))+1),quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="PG_USER", value_to_set=db_user,quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="PG_PASS", value_to_set=db_pass,quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_CONFIG", value_to_set=config_path,quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_ADDONS", value_to_set=addons,quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_PORT", value_to_set=find_available_port(app_port),quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_IMAGE", value_to_set=image.lower(),quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_TAG", value_to_set=tag,quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_CONTAINER", value_to_set=container.lower(),quote_mode="never")
set_key(dotenv_path=env_file_path, key_to_set="ODOO_BACKUP", value_to_set=backup,quote_mode="never")
if __name__ == "__main__":
main()

245
init_config.sh Executable file
View File

@ -0,0 +1,245 @@
#!/usr/bin/bash
export PATH=/usr/sbin:$PATH
export DEBIAN_FRONTEND=noninteractive
set -euo pipefail
DEPLOY_PATH=$(pwd)/deployment
SETUP_PATH=$(pwd)/setup
PIP=$(pwd)/venv/bin/pip
PYTHON=$(pwd)/venv/bin/python
ODOO_ADDONS=${DEPLOY_PATH}/addons
ODOO_CONFIG=${DEPLOY_PATH}/etc
ODOO_BACKUP=${DEPLOY_PATH}/backup
# System
DEPENDS_PACKAGE=('wget' 'curl' 'git' 'unzip' 'make' 'wkhtmltopdf' 'postgresql-client')
DEPENDS_COMMAND=('wget' 'curl' 'git' 'unzip' 'make' 'wkhtmltopdf' 'psql')
((EUID)) && sudo_cmd="sudo" || sudo_cmd=""
readonly MINIMUM_DOCER_VERSION="20"
UNAME_U="$(uname -s)"
readonly UNAME_U
readonly COLOUR_RESET='\e[0m'
readonly aCOLOUR=(
'\e[38;5;154m' # green | Lines, bullets and separators
'\e[1m' # Bold white | Main descriptions
'\e[90m' # Grey | Credits
'\e[91m' # Red | Update notifications Alert
'\e[33m' # Yellow | Emphasis
)
trap 'onCtrlC' INT
onCtrlC() {
echo -e "${COLOUR_RESET}"
exit 1
}
Show() {
# OK
if (($1 == 0)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[0]} OK $COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
# FAILED
elif (($1 == 1)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[3]}FAILED$COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
exit 1
# INFO
elif (($1 == 2)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[0]} INFO $COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
# NOTICE
elif (($1 == 3)); then
echo -e "${aCOLOUR[2]}[$COLOUR_RESET${aCOLOUR[4]}NOTICE$COLOUR_RESET${aCOLOUR[2]}]$COLOUR_RESET $2"
fi
}
Warn() {
echo -e "${aCOLOUR[3]}$1$COLOUR_RESET"
}
GreyStart() {
echo -e "${aCOLOUR[2]}\c"
}
ColorReset() {
echo -e "$COLOUR_RESET\c"
}
Update_Package_Resource() {
GreyStart
if [ -x "$(command -v apk)" ]; then
${sudo_cmd} apk update
elif [ -x "$(command -v apt)" ]; then
${sudo_cmd} apt update
elif [ -x "$(command -v dnf)" ]; then
${sudo_cmd} dnf check-update
elif [ -x "$(command -v zypper)" ]; then
${sudo_cmd} zypper update
elif [ -x "$(command -v yum)" ]; then
${sudo_cmd} yum update
fi
ColorReset
}
# 3 Check OS
Check_OS() {
if [[ $UNAME_U == *Linux* ]]; then
Show 0 "Your System is : $UNAME_U"
else
Show 1 "This script is only for Linux."
exit 1
fi
}
Install_Depends() {
for ((i = 0; i < ${#DEPENDS_COMMAND[@]}; i++)); do
cmd=${DEPENDS_COMMAND[i]}
if [[ ! -x $(${sudo_cmd} which "$cmd") ]]; then
packagesNeeded=${DEPENDS_PACKAGE[i]}
Show 2 "Install the necessary dependencies: \e[33m$packagesNeeded \e[0m"
GreyStart
if [ -x "$(command -v apk)" ]; then
${sudo_cmd} apk add --no-cache "$packagesNeeded"
elif [ -x "$(command -v apt-get)" ]; then
${sudo_cmd} apt-get -y -q install "$packagesNeeded" --no-upgrade
elif [ -x "$(command -v dnf)" ]; then
${sudo_cmd} dnf install "$packagesNeeded"
elif [ -x "$(command -v zypper)" ]; then
${sudo_cmd} zypper install "$packagesNeeded"
elif [ -x "$(command -v yum)" ]; then
${sudo_cmd} yum install "$packagesNeeded"
elif [ -x "$(command -v pacman)" ]; then
${sudo_cmd} pacman -S "$packagesNeeded"
elif [ -x "$(command -v paru)" ]; then
${sudo_cmd} paru -S "$packagesNeeded"
else
Show 1 "Package manager not found. You must manually install: \e[33m$packagesNeeded \e[0m"
fi
ColorReset
else
Show 2 "\e[33m ${DEPENDS_COMMAND[i]}\e[0m Installed"
fi
done
}
Check_Dependency_Installation() {
for ((i = 0; i < ${#DEPENDS_COMMAND[@]}; i++)); do
cmd=${DEPENDS_COMMAND[i]}
if [[ ! -x $(${sudo_cmd} which "$cmd") ]]; then
packagesNeeded=${DEPENDS_PACKAGE[i]}
Show 1 "Dependency \e[33m$packagesNeeded \e[0m installation failed, please try again manually!"
exit 1
fi
done
}
Check_Docker_Install() {
if [[ -x "$(command -v docker)" ]]; then
Docker_Version=$(${sudo_cmd} docker version --format '{{.Server.Version}}')
if [[ $? -ne 0 ]]; then
Install_Docker
elif [[ ${Docker_Version:0:2} -lt "${MINIMUM_DOCER_VERSION}" ]]; then
Show 1 "Recommended minimum Docker version is \e[33m${MINIMUM_DOCER_VERSION}.xx.xx\e[0m,\Current Docker verison is \e[33m${Docker_Version}\e[0m,\nPlease uninstall current Docker and rerun the CasaOS installation script."
exit 1
else
Show 0 "Current Docker verison is ${Docker_Version}."
fi
else
Install_Docker
fi
}
Install_Docker() {
Show 2 "Install the necessary dependencies: \e[33mDocker \e[0m"
if [[ ! -d "${PREFIX}/etc/apt/sources.list.d" ]]; then
${sudo_cmd} mkdir -p "${PREFIX}/etc/apt/sources.list.d"
fi
GreyStart
if [[ "${REGION}" = "CN" ]]; then
${sudo_cmd} curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun
else
${sudo_cmd} curl -fsSL https://get.docker.com | bash
fi
ColorReset
if [[ $? -ne 0 ]]; then
Show 1 "Installation failed, please try again."
exit 1
else
Check_Docker_Install_Final
fi
}
Check_Docker_Install_Final() {
if [[ -x "$(command -v docker)" ]]; then
Docker_Version=$(${sudo_cmd} docker version --format '{{.Server.Version}}')
if [[ $? -ne 0 ]]; then
Install_Docker
elif [[ ${Docker_Version:0:2} -lt "${MINIMUM_DOCER_VERSION}" ]]; then
Show 1 "Recommended minimum Docker version is \e[33m${MINIMUM_DOCER_VERSION}.xx.xx\e[0m,\Current Docker verison is \e[33m${Docker_Version}\e[0m,\nPlease uninstall current Docker and rerun the CasaOS installation script."
exit 1
else
Show 0 "Current Docker verison is ${Docker_Version}."
Check_Docker_Running
fi
else
Show 1 "Installation failed, please run 'curl -fsSL https://get.docker.com | bash' and rerun the CasaOS installation script."
exit 1
fi
}
Generate_Config_Docker(){
ODOO_IMAGE=${1:-}
ODOO_TAG=${2:-}
ODOO_CONTAINER=${3:-}
if [[ ! -f "${DEPLOY_PATH}/.env" ]]; then
cp "${DEPLOY_PATH}/env.template" "${DEPLOY_PATH}/.env"
fi
USER="${REPO_NAME:-"default_repo"}"
# Convert to lowercase
USER="${USER,,}"
${PYTHON} "$SETUP_PATH/gen_config_docker.py" --db_port 5432 --db_user $USER --deploy_path "$DEPLOY_PATH" \
--image "${ODOO_IMAGE}" --container "${ODOO_CONTAINER}" --tag "${ODOO_TAG:=latest}" \
--addons "${ODOO_ADDONS}" --config "${ODOO_CONFIG}" --backup "${ODOO_BACKUP}"
Show 0 " Generate Config Complete"
}
Generate_Config_Native(){
DB_USER=${2:-}
DB_PASSWORD=${3:-}
DB_SERVER=${4:-}
DB_PORT=${5:-}
ADDONS=${1:-}
REPO_NAME=$(basename "$(git rev-parse --show-toplevel)" | sed -E 's/[.-]/_/g')
USER="${REPO_NAME:-"default_repo"}"
# Convert to lowercase
USER="${USER,,}"
PASSWORD="$(openssl rand -hex 24)"
# Check if the user already exists
USER_EXISTS=$(psql "postgresql://${DB_USER}:${DB_PASSWORD}@${DB_SERVER}:${DB_PORT}/postgres" -t -A -c "SELECT COUNT(*) FROM pg_roles WHERE rolname='$USER';")
if [ $USER_EXISTS -eq 0 ]; then
# User does not exist, create the user
Show 2 "Create the new PostgreSQL username: $USER with password: $PASSWORD"
psql "postgresql://${DB_USER}:${DB_PASSWORD}@${DB_SERVER}:${DB_PORT}/postgres" -c "CREATE USER $USER WITH PASSWORD '$PASSWORD';"
Show 2 "Grant $USER superuser (admin) privileges"
psql "postgresql://${DB_USER}:${DB_PASSWORD}@${DB_SERVER}:${DB_PORT}/postgres" -c "ALTER USER $USER WITH SUPERUSER;"
else
# User exists, update the password (do not try to create)
Show 2 "User $USER already exists, updating password to $PASSWORD"
psql "postgresql://${DB_USER}:${DB_PASSWORD}@${DB_SERVER}:${DB_PORT}/postgres" -c "ALTER USER $USER WITH PASSWORD '$PASSWORD';"
fi
${PYTHON} "$SETUP_PATH/gen_config.py" --db_user $USER --db_pass $PASSWORD --deploy_path "$(pwd)" \
--addons_path $ADDONS --db_port $DB_PORT --db_server $DB_SERVER
Show 0 " Generate Config Complete"
}
main(){
TYPE=${1:-}
Check_OS
# Update_Package_Resource
# Install_Depends
# Check_Dependency_Installation
# Check_Docker_Install
case "$TYPE" in
--native)
Generate_Config_Native $2 $3 $4 $5 $6
;;
--docker)
Generate_Config_Docker $2 $3 $4
;;
*)
# else
Show 1 "Invalid argument (--docker|--native)"
;;
esac
}
main "$@"

44
merge_module.py Executable file
View File

@ -0,0 +1,44 @@
import subprocess
import yaml
import os
import argparse
# Set up argument parsing
parser = argparse.ArgumentParser(
description="Checkout modules from target branch that are not in source branch."
)
parser.add_argument("yaml_file", help="Path to the YAML file")
parser.add_argument("source_branch", help="The source branch")
parser.add_argument("target_branch", help="The target branch")
parser.add_argument("root_repo", help="Path to the root repository")
# Parse the arguments
args = parser.parse_args()
yaml_file = args.yaml_file
source_branch = args.source_branch
target_branch = args.target_branch
root_repo = args.root_repo
# Change to the repository directory
os.chdir(root_repo)
# Read YAML file
with open(yaml_file, "r") as file:
data = yaml.safe_load(file)
# Extract module lists for source and target branches
modules_source = data["branches"].get(source_branch, {}).get("modules", [])
modules_target = data["branches"].get(target_branch, {}).get("modules", [])
# Ensure the latest changes are fetched
subprocess.run(["git", "fetch", "origin"], check=True)
# Checkout source branch first
print(f"Checking out source branch: {source_branch}")
subprocess.run(["git", "checkout", source_branch], check=True)
# Checkout modules in target_branch that are not in source_branch
for module in modules_target:
if module not in modules_source:
print(f"Checking out module: {module}")
subprocess.run(["git", "checkout", target_branch, "--", module], check=True)

33
migrate-lang.sh Executable file
View File

@ -0,0 +1,33 @@
#!/bin/bash
# Set source and destination repositories
SRC_REPO="/root/dev/NextERP/dev/Viindoo/odoo-18.0"
DEST_REPO="/root/dev/NextERP/dev/odoo18/Odoo18"
LANG="vi"
# Ensure both paths exist
if [ ! -d "$SRC_REPO" ]; then
echo "Error: Source repository does not exist!"
exit 1
fi
if [ ! -d "$DEST_REPO" ]; then
echo "Error: Destination repository does not exist!"
exit 1
fi
# Find and copy vi.po files while preserving directory structure
cd "$SRC_REPO" || exit
find . -type f -name "${LANG}.po" | while read -r file; do
# Get the directory path of the file
dir_path=$(dirname "$file")
# Ensure the destination directory exists
mkdir -p "$DEST_REPO/$dir_path"
# Copy the file
cp "$file" "$DEST_REPO/$dir_path/"
echo "Copied: $file -> $DEST_REPO/$dir_path/"
done
echo "All ${LANG}.po files copied successfully!"

82
modules_scan.sh Executable file
View File

@ -0,0 +1,82 @@
#!/bin/bash
# Check if required arguments are provided
if [ $# -lt 3 ] || [ $# -gt 4 ]; then
echo "Usage: $0 <input_file> <root_folder> <output_yaml_file> [list_branch]"
echo "Example: $0 exclude_list.txt /path/to/git/repo /path/to/output.yaml 'branch1 branch2'"
exit 1
fi
INPUT_FILE="$1"
ROOT_FOLDER="$2"
OUTPUT_FILE="$3"
LIST_BRANCH="$4"
# Check if input file exists
if [ ! -f "$INPUT_FILE" ]; then
echo "Error: Input file '$INPUT_FILE' not found"
exit 1
fi
# Check if root folder exists
if [ ! -d "$ROOT_FOLDER" ]; then
echo "Error: Root folder '$ROOT_FOLDER' not found"
exit 1
fi
# Check if output YAML file exists, if not create it
if [ ! -f "$OUTPUT_FILE" ]; then
echo "Output file does not exist. Creating $OUTPUT_FILE"
touch "$OUTPUT_FILE"
fi
# Change to root folder
cd "$ROOT_FOLDER" || exit 1
# Initialize output file
echo "branches:" > "$OUTPUT_FILE"
# Get all git branches
git fetch --all
if [ -z "$LIST_BRANCH" ]; then
branches=$(git branch -r | grep -v HEAD | sed 's/origin\///' | sed 's/^[[:space:]]*//')
else
branches=$LIST_BRANCH
fi
# Process each branch
for branch in $branches; do
echo "Processing branch: $branch"
# Checkout branch
git checkout "$branch" 2>/dev/null || continue
# Get all folders in current branch
folders=$(find . -maxdepth 1 -type d -not -path '.' -not -path './.*' | sed 's|./||')
# Array to store modules not in input file
modules=()
# Check each folder against input file
while IFS= read -r folder; do
# Skip if folder is empty
[ -z "$folder" ] && continue
# Check if folder is in input file
if ! grep -Fxq "$folder" "$INPUT_FILE"; then
modules+=("$folder")
fi
done <<< "$folders"
# Write to yaml if there are modules
if [ ${#modules[@]} -gt 0 ]; then
echo " $branch:" >> "$OUTPUT_FILE"
echo " modules:" >> "$OUTPUT_FILE"
for module in "${modules[@]}"; do
echo " - $module" >> "$OUTPUT_FILE"
done
fi
done
echo "Output written to $OUTPUT_FILE"

259
record_cleaner.py Executable file
View File

@ -0,0 +1,259 @@
#!/usr/bin/env python
"""
Delete records from an Odoo database based on a model and domain filter.
Usage:
delete_records.py <db_name> <base_model>
Example:
delete_records.py mydb res.partner --domain "[('active', '=', False)]" --force
"""
import argparse
import ast
import json
import multiprocessing as mp
import os
import sys
from typing import Dict, List, Tuple
from functools import partial
import odoorpc
import color_log
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
DEFAULT_DOMAIN = "[]"
DEFAULT_PROCESS_SIZE = min(mp.cpu_count() * 2, 32) # Dynamic default based on CPU
CACHE_DIR = "cache"
CHUNK_SIZE = 500 # Records per batch for search operations
# Logging levels
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(
description="Safely delete records from an Odoo model with referential integrity checks.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("db_name", help="Database name")
parser.add_argument("base_model", help="Model to delete records from")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument("--username", default=DEFAULT_USERNAME, help="Odoo username")
parser.add_argument("--password", default=DEFAULT_PASSWORD, help="Odoo password")
parser.add_argument(
"--domain", default=DEFAULT_DOMAIN, help="Domain filter as Python list"
)
parser.add_argument(
"--process-size",
type=int,
default=DEFAULT_PROCESS_SIZE,
help="Number of parallel processes",
)
parser.add_argument(
"--chunk-size",
type=int,
default=CHUNK_SIZE,
help="Records per batch for search operations",
)
action_group = parser.add_mutually_exclusive_group()
action_group.add_argument(
"--force",
action="store_true",
help="Force delete with referential integrity bypass",
)
parser.add_argument(
"--refresh-cache", action="store_true", help="Refresh related models cache"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Simulate operations without making changes",
)
parser.add_argument("--verbose", action="store_true", help="Show detailed output")
args = parser.parse_args()
# Validate domain syntax early
try:
ast.literal_eval(args.domain)
except (ValueError, SyntaxError) as e:
color_log.Show(FAIL, f"Invalid domain syntax: {e}")
sys.exit(1)
return args
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
if args.verbose:
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
odoo.login(args.db_name, args.username, args.password)
color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def get_related_fields(
odoo: odoorpc.ODOO, args: argparse.Namespace
) -> Dict[str, List[str]]:
"""Retrieve related fields with cache management."""
cache_path = os.path.join(CACHE_DIR, args.db_name, f"{args.base_model}.cache.json")
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
if not args.refresh_cache and os.path.exists(cache_path):
with open(cache_path, "r") as f:
color_log.Show(INFO, f"Loaded related models from cache: {args.base_model}")
return json.load(f)
color_log.Show(INFO, f"Building related models cache for {args.base_model}...")
related = {}
Model = odoo.env["ir.model"]
model_ids = Model.search([("model", "!=", args.base_model)])
for model in Model.read(model_ids, ["model"]):
try:
fields = odoo.env[model["model"]].fields_get()
related_fields = [
name
for name, desc in fields.items()
if desc.get("relation") == args.base_model
and desc.get("type") in ["many2one", "many2many", "one2many"]
]
if related_fields:
related[model["model"]] = related_fields
except Exception as e:
if args.verbose:
color_log.Show(WARNING, f"Skipping {model['model']}: {str(e)}")
with open(cache_path, "w") as f:
json.dump(related, f, indent=2)
return related
def chunker(seq: List[int], size: int) -> List[List[int]]:
"""Efficient batch generator."""
return [seq[pos : pos + size] for pos in range(0, len(seq), size)]
def process_batch(
args: argparse.Namespace, batch: List[int], related: Dict[str, List[str]]
) -> Tuple[int, int, int]:
"""Process a batch of records with proper error handling."""
deleted = archived = skipped = 0
odoo = connect_to_odoo(args)
model = odoo.env[args.base_model]
for record_id in batch:
try:
if args.dry_run:
color_log.Show(INFO, f"[DRY-RUN] Would process record {record_id}")
continue
# Check references
if not args.force:
# referenced = any(
# odoo.env[rel_model].search_count([(field, "=", record_id)])
# for rel_model, fields in related.items()
# for field in fields
# )
# if referenced:
model.write([record_id], {"active": False})
archived += 1
color_log.Show(OK, f"Archived {args.base_model} ID {record_id}")
continue
else:
model.unlink([record_id])
deleted += 1
color_log.Show(OK, f"Deleted {args.base_model} ID {record_id}")
except odoorpc.error.RPCError as e:
color_log.Show(WARNING, f"Error processing {record_id}: {e}")
skipped += 1
except Exception as e:
color_log.Show(WARNING, f"Unexpected error with {record_id}: {e}")
skipped += 1
return deleted, archived, skipped
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
# Validate model exists
if args.base_model not in odoo.env:
color_log.Show(FAIL, f"Model {args.base_model} does not exist")
sys.exit(1)
# Retrieve records
domain = ast.literal_eval(args.domain)
record_ids = odoo.env[args.base_model].search(
domain, offset=0, limit=None, order="id"
)
if not record_ids:
color_log.Show(
WARNING, f"No records found in {args.base_model} with domain {domain}"
)
return
color_log.Show(INFO, f"Found {len(record_ids)} records to process")
# Prepare related models data
related = get_related_fields(odoo, args)
if related and args.verbose:
color_log.Show(INFO, f"Related models: {json.dumps(related, indent=2)}")
# Parallel processing
batches = chunker(record_ids, args.chunk_size)
color_log.Show(
INFO, f"Processing {len(batches)} batches with {args.process_size} workers"
)
total_stats = [0, 0, 0]
with mp.Pool(args.process_size) as pool:
results = pool.imap_unordered(
partial(process_batch, args, related=related), batches
)
for deleted, archived, skipped in results:
total_stats[0] += deleted
total_stats[1] += archived
total_stats[2] += skipped
# Final report
color_log.Show(OK, "\nOperation summary:")
color_log.Show(OK, f"Total deleted: {total_stats[0]}")
color_log.Show(OK, f"Total archived: {total_stats[1]}")
color_log.Show(OK, f"Total skipped: {total_stats[2]}")
color_log.Show(
OK, f"Success rate: {(total_stats[0]+total_stats[1])/len(record_ids)*100:.1f}%"
)
if args.dry_run:
color_log.Show(WARNING, "Dry-run mode: No changes were made to the database")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)

244
replace_attrs.py Normal file
View File

@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
import re
from bs4 import formatter, BeautifulSoup as bs
from pathlib import Path
xml_4indent_formatter = formatter.XMLFormatter(indent=4)
NEW_ATTRS = {'required', 'invisible', 'readonly', 'column_invisible'}
percent_d_regex = re.compile("%\('?\"?[\w\.\d_]+'?\"?\)d")
def get_files_recursive(path):
return (str(p) for p in Path(path).glob('**/*.xml') if p.is_file())
root_dir = input('Enter root directory to check (empty for current directory) : ')
root_dir = root_dir or '.'
all_xml_files = get_files_recursive(root_dir)
def normalize_domain(domain):
"""Normalize Domain, taken from odoo/osv/expression.py -> just the part so that & operators are added where needed.
After that, we can use a part of the def parse() from the same file to manage parenthesis for and/or"""
if len(domain) == 1:
return domain
result = []
expected = 1 # expected number of expressions
op_arity = {'!': 1, '&': 2, '|': 2}
for token in domain:
if expected == 0: # more than expected, like in [A, B]
result[0:0] = ['&'] # put an extra '&' in front
expected = 1
if isinstance(token, (list, tuple)): # domain term
expected -= 1
token = tuple(token)
else:
expected += op_arity.get(token, 0) - 1
result.append(token)
return result
def stringify_leaf(leaf):
stringify = ''
switcher = False
# Replace operators not supported in python (=, like, ilike)
operator = str(leaf[1])
if operator == '=':
operator = '=='
elif 'like' in operator:
if 'not' in operator:
operator = 'not in'
else:
operator = 'in'
switcher = True
# Take left operand, never to add quotes (should be python object / field)
left_operand = leaf[0]
# Take care of right operand, don't add quotes if it's list/tuple/set/boolean/number, check if we have a true/false/1/0 string tho.
right_operand = leaf[2]
if right_operand in ('True', 'False', '1', '0') or type(right_operand) in (list, tuple, set, int, float, bool):
right_operand = str(right_operand)
else:
right_operand = "'"+right_operand+"'"
stringify = "%s %s %s" % (right_operand if switcher else left_operand, operator, left_operand if switcher else right_operand)
return stringify
def stringify_attr(stack):
if stack in (True, False, 'True', 'False', 1, 0, '1', '0'):
return stack
last_parenthesis_index = max(index for index, item in enumerate(stack[::-1]) if item not in ('|', '!'))
stack = normalize_domain(stack)
stack = stack[::-1]
result = []
for index, leaf_or_operator in enumerate(stack):
if leaf_or_operator == '!':
expr = result.pop()
result.append('(not (%s))' % expr)
elif leaf_or_operator == '&' or leaf_or_operator == '|':
left = result.pop()
# In case of a single | or single & , we expect that it's a tag that have an attribute AND a state
# the state will be added as OR in states management
try:
right = result.pop()
except IndexError:
res = left + ('%s' % ' and' if leaf_or_operator=='&' else ' or')
result.append(res)
continue
form = '(%s %s %s)'
if index > last_parenthesis_index:
form = '%s %s %s'
result.append(form % (left, 'and' if leaf_or_operator=='&' else 'or', right))
else:
result.append(stringify_leaf(leaf_or_operator))
result = result[0]
return result
def get_new_attrs(attrs):
new_attrs = {}
attrs_dict = eval(attrs.strip())
for attr in NEW_ATTRS:
if attr in attrs_dict.keys():
new_attrs[attr] = stringify_attr(attrs_dict[attr])
ordered_attrs = {attr: new_attrs[attr] for attr in NEW_ATTRS if attr in new_attrs}
return ordered_attrs
# Prettify puts <attribute> on three lines (1/ opening tag, 2/ text, 3/ closing tag), not very cool.
# Taken from https://stackoverflow.com/questions/55962146/remove-line-breaks-and-spaces-around-span-elements-with-python-regex
# And changed to avoid putting ALL one line, and only manage <attribute>, as it's the only one messing stuff here
# Kinda ugly to use the 3 types of tags but tbh I keep it like this while I have no time for a regex replace keeping the name="x" :p
def prettify_output(html):
for attr in NEW_ATTRS:
html = re.sub(f'<attribute name="{attr}">[ \n]+',f'<attribute name="{attr}">', html)
html = re.sub(f'[ \n]+</attribute>',f'</attribute>', html)
html = re.sub(r'<field name="([a-z_]+)">[ \n]+', r'<field name="\1">', html)
html = re.sub(r'[ \n]+</field>', r'</field>', html)
return html
autoreplace = input('Do you want to auto-replace attributes ? (y/n) (empty == no) (will not ask confirmation for each file) : ') or 'n'
nofilesfound = True
ok_files = []
nok_files = []
for xml_file in all_xml_files:
try:
with open(xml_file, 'rb') as f:
contents = f.read().decode('utf-8')
f.close()
if not 'attrs' in contents and not 'states' in contents:
continue
counter_for_percent_d_replace = 1
percent_d_results = {}
for percent_d in percent_d_regex.findall(contents):
contents = contents.replace(percent_d, "'REPLACEME%s'" % counter_for_percent_d_replace)
percent_d_results[counter_for_percent_d_replace] = percent_d
counter_for_percent_d_replace += 1
soup = bs(contents, 'xml')
tags_with_attrs = soup.select('[attrs]')
attribute_tags_name_attrs = soup.select('attribute[name="attrs"]')
tags_with_states = soup.select('[states]')
attribute_tags_name_states = soup.select('attribute[name="states"]')
if not (tags_with_attrs or attribute_tags_name_attrs or\
tags_with_states or attribute_tags_name_states):
continue
print('\n################################################################')
print('##### Taking care of file -> %s' % xml_file)
print('\n########### Current tags found ###\n')
for t in tags_with_attrs + attribute_tags_name_attrs + tags_with_states + attribute_tags_name_states:
print(t)
nofilesfound = False
# Management of tags that have attrs=""
for tag in tags_with_attrs:
attrs = tag['attrs']
new_attrs = get_new_attrs(attrs)
del tag['attrs']
for new_attr in new_attrs.keys():
tag[new_attr] = new_attrs[new_attr]
# Management of attributes name="attrs"
attribute_tags_after = []
for attribute_tag in attribute_tags_name_attrs:
new_attrs = get_new_attrs(attribute_tag.text)
for new_attr in new_attrs.keys():
new_tag = soup.new_tag('attribute')
new_tag['name'] = new_attr
new_tag.append(str(new_attrs[new_attr]))
attribute_tags_after.append(new_tag)
attribute_tag.insert_after(new_tag)
attribute_tag.decompose()
# Management ot tags that have states=""
for state_tag in tags_with_states:
base_invisible = ''
if 'invisible' in state_tag.attrs and state_tag['invisible']:
base_invisible = state_tag['invisible']
if not (base_invisible.endswith('or') or base_invisible.endswith('and')):
base_invisible = base_invisible + ' or '
else:
base_invisible = base_invisible + ' '
invisible_attr = "state not in [%s]" % ','.join(("'" + state.strip() + "'") for state in state_tag['states'].split(','))
state_tag['invisible'] = base_invisible + invisible_attr
del state_tag['states']
# Management of attributes name="states"
attribute_tags_states_after = []
for attribute_tag_states in attribute_tags_name_states:
states = attribute_tag_states.text
existing_invisible_tag = False
# I don't know why, looking for attribute[name="invisible"] does not work,
# but if it exists, I can find it with findAll attribute -> loop to name="invisible"
for tag in attribute_tag_states.parent.findAll('attribute'):
if tag['name'] == 'invisible':
existing_invisible_tag = tag
break
if not existing_invisible_tag:
existing_invisible_tag = soup.new_tag('attribute')
existing_invisible_tag['name'] = 'invisible'
if existing_invisible_tag.text:
states_to_add = 'state not in [%s]' % (
','.join(("'" + state.strip() + "'") for state in states.split(','))
)
if existing_invisible_tag.text.endswith('or') or existing_invisible_tag.text.endswith('and'):
new_invisible_text = '%s %s' % (existing_invisible_tag.text, states_to_add)
else:
new_invisible_text = ' or '.join([existing_invisible_tag.text, states_to_add])
else:
new_invisible_text = 'state not in [%s]' % (
','.join(("'" + state.strip() + "'") for state in states.split(','))
)
existing_invisible_tag.string = new_invisible_text
attribute_tag_states.insert_after(existing_invisible_tag)
attribute_tag_states.decompose()
attribute_tags_states_after.append(existing_invisible_tag)
print('\n########### Will be replaced by ###\n')
for t in tags_with_attrs + attribute_tags_after + tags_with_states + attribute_tags_states_after:
print(t)
print('################################################################\n')
if autoreplace.lower()[0] == 'n':
confirm = input('Do you want to replace? (y/n) (empty == no) : ') or 'n'
else:
confirm = 'y'
if confirm.lower()[0] == 'y':
with open(xml_file, 'wb') as rf:
html = soup.prettify(formatter=xml_4indent_formatter)
html = prettify_output(html)
for percent_d_result in percent_d_results.keys():
html = html.replace("'REPLACEME%s'" % percent_d_result, percent_d_results[percent_d_result])
rf.write(html.encode('utf-8'))
ok_files.append(xml_file)
except Exception as e:
nok_files.append((xml_file, e))
print('\n################################################')
print('################## Run Debug ##################')
print('################################################')
if nofilesfound:
print('No XML Files with "attrs" or "states" found in dir "%s"' % root_dir)
print('Succeeded on files')
for file in ok_files:
print(file)
if not ok_files:
print('No files')
print('')
print('Failed on files')
for file in nok_files:
print(file[0])
print('Reason: ', file[1])
if not nok_files:
print('No files')

516
requirements-check.py Executable file
View File

@ -0,0 +1,516 @@
#!/usr/bin/env python
"""
Checks versions from the requirements files against distribution-provided
versions, taking distribution's Python version in account e.g. if checking
against a release which bundles Python 3.5, checks the 3.5 version of
requirements.
* only shows requirements for which at least one release diverges from the
matching requirements version
* empty or green cells mean that specific release matches its requirement (happens when
checking multiple releases: one of the other releases may mismatch the its
requirements necessating showing the row)
This script was heavily reworked but is not in a final version:
TODO:
- add legends
- better management of cache
- add meta info on cells (mainly to genearte a better html report)
- warn/ko reason
- wheel + link
- original debian package name + link
...
"""
import argparse
import gzip
import itertools
import json
import os
import re
import shutil
import tempfile
try:
import ansitoimg
except ImportError:
ansitoimg = None
from abc import ABC, abstractmethod
from pathlib import Path
from sys import stderr, stdout
from typing import Dict, List, Optional, Tuple
from urllib.request import HTTPError
from urllib.request import urlopen as _urlopen
from packaging.markers import Marker
from packaging.requirements import Requirement
from packaging.tags import mac_platforms # noqa: PLC2701
from packaging.utils import canonicalize_name
from pip._internal.index.package_finder import (
LinkEvaluator, # noqa: PLC2701
)
from pip._internal.models.link import Link # noqa: PLC2701
from pip._internal.models.target_python import TargetPython # noqa: PLC2701
Version = Tuple[int, ...]
# shared beween debian and ubuntu
SPECIAL = {
'pytz': 'tz',
'libsass': 'libsass-python',
}
SUPPORTED_FORMATS = ('txt', 'ansi', 'svg', 'html', 'json')
PLATFORM_CODES = ('linux', 'win32', 'darwin')
PLATFORM_NAMES = ('Linux', 'Win', 'OSX')
def urlopen(url):
file_name = "".join(c if c.isalnum() else '_' for c in url)
os.makedirs('/tmp/package_versions_cache/', exist_ok=True)
file_path = f'/tmp/package_versions_cache/{file_name}'
if not os.path.isfile(file_path):
response = _urlopen(url)
with open(file_path, 'wb') as fw:
fw.write(response.read())
return open(file_path, 'rb') # noqa: SIM115
def parse_version(vstring: str) -> Optional[Version]:
if not vstring:
return None
return tuple(map(int, vstring.split('.')))
def cleanup_debian_version(s: str) -> str:
""" Try to strip the garbage from the version string, just remove everything
following the first `+`, `~` or `-`
"""
return re.match(r'''
(?:\d+:)? # debian crud prefix
(.*?) # the shit we actually want
(?:~|\+|-|\.dfsg)
.*
''', s, flags=re.VERBOSE)[1]
class PipPackage:
def __init__(self, name):
self.name = name
infos = json.load(urlopen(f'https://pypi.org/pypi/{name}/json'))
self.info = infos['info']
self.last_serial = infos['last_serial']
self.releases = infos['releases']
self.urls = infos['urls']
self.vulnerabilities = infos['vulnerabilities']
def has_wheel_for(self, version, python_version, platform):
if version is None:
return (False, False, False)
py_version_info = python_version.split('.')
if len(py_version_info) == 2:
py_version_info = (py_version_info[0], py_version_info[1], 0)
releases = self.releases
has_wheel_for_version = False
has_any_wheel = False
has_wheel_in_another_version = False
platforms = None
if platform == 'darwin':
platforms = list(mac_platforms((15, 0), 'x86_64'))
elif platform == 'win32':
platforms = ['win32', 'win-amd64']
else:
assert platform == 'linux'
target_python = TargetPython(
platforms=platforms,
py_version_info=py_version_info,
abis=None,
implementation=None,
)
le = LinkEvaluator(
project_name=self.name,
canonical_name=canonicalize_name(self.name),
formats={"binary", "source"},
target_python=target_python,
allow_yanked=True,
ignore_requires_python=False,
)
for release in releases[version]:
if release['filename'].endswith('.whl'):
has_any_wheel = True
is_candidate, _result = le.evaluate_link(Link(
comes_from=None,
url=release['url'],
requires_python=release['requires_python'],
yanked_reason=release['yanked_reason'],
))
if is_candidate:
if release['filename'].endswith('.whl'):
has_wheel_for_version = has_wheel_in_another_version = True
break
if not has_wheel_for_version and has_any_wheel:
# TODO, we should prefer a version matching the one from a distro
for rel_version, rel in releases.items():
for release in rel:
if not release['filename'].endswith('.whl'):
continue
if any(not s.isdigit() for s in rel_version.split('.')) or parse_version(rel_version) <= parse_version(version):
continue
is_candidate, _result = le.evaluate_link(Link(
comes_from=None,
url=release['url'],
requires_python=release['requires_python'],
yanked_reason=release['yanked_reason'],
))
if is_candidate:
has_wheel_in_another_version = True
stderr.write(f'WARNING: Wheel found for {self.name} ({python_version} {platform}) in {rel_version}\n')
return (has_wheel_for_version, has_any_wheel, has_wheel_in_another_version)
return (has_wheel_for_version, has_any_wheel, has_wheel_in_another_version)
class Distribution(ABC):
def __init__(self, release):
self._release = release
@abstractmethod
def get_version(self, package: str) -> Optional[Version]:
...
def __str__(self):
return f'{type(self).__name__.lower()} {self._release}'
@classmethod
def get(cls, name):
try:
return next(
c
for c in cls.__subclasses__()
if c.__name__.lower() == name
)
except StopIteration:
msg = f"Unknown distribution {name!r}"
raise ValueError(msg)
class Debian(Distribution):
def get_version(self, package):
""" Try to find which version of ``package`` is in Debian release {release}
"""
package = SPECIAL.get(package, package)
# try the python prefix first: some packages have a native of foreign $X and
# either the bindings or a python equivalent at python-X, or just a name
# collision
prefixes = ['python-', '']
if package.startswith('python'):
prefixes = ['']
for prefix in prefixes:
try:
res = json.load(urlopen(f'https://sources.debian.org/api/src/{prefix}{package}/'))
except HTTPError:
return 'failed'
if res.get('error') is None:
break
if res.get('error'):
return
try:
return next(
parse_version(cleanup_debian_version(distr['version']))
for distr in res['versions']
if distr['area'] == 'main'
if self._release.lower() in distr['suites']
)
except StopIteration:
return
class Ubuntu(Distribution):
""" Ubuntu doesn't have an API, instead it has a huge text file
"""
def __init__(self, release):
super().__init__(release)
self._packages = {}
# ideally we should request the proper Content-Encoding but PUC
# apparently does not care, and returns a somewhat funky
# content-encoding (x-gzip) anyway
data = gzip.open(
urlopen(f'https://packages.ubuntu.com/source/{release}/allpackages?format=txt.gz'),
mode='rt', encoding='utf-8',
)
for line in itertools.islice(data, 6, None): # first 6 lines is garbage header
# ignore the restricted, security, universe, multiverse tags
m = re.match(r'(\S+) \(([^)]+)\)', line.strip())
assert m, f"invalid line {line.strip()!r}"
self._packages[m[1]] = m[2]
def get_version(self, package):
package = SPECIAL.get(package, package)
for prefix in ['python3-', 'python-', '']:
v = self._packages.get(f'{prefix}{package}')
if v:
return parse_version(cleanup_debian_version(v))
return None
def _strip_comment(line):
return line.split('#', 1)[0].strip()
def parse_requirements(reqpath: Path) -> Dict[str, List[Tuple[str, Marker]]]:
""" Parses a requirement file to a dict of {package: [(version, markers)]}
The env markers express *whether* that specific dep applies.
"""
reqs = {}
with reqpath.open('r', encoding='utf-8') as f:
for req_line in f:
req_line = _strip_comment(req_line)
if not req_line:
continue
requirement = Requirement(req_line)
version = None
if requirement.specifier:
if len(requirement.specifier) > 1:
raise NotImplementedError('multi spec not supported yet')
version = next(iter(requirement.specifier)).version
reqs.setdefault(requirement.name, []).append((version, requirement.marker))
return reqs
def ok(text):
return f'\033[92m{text}\033[39m'
def em(text):
return f'\033[94m{text}\033[39m'
def warn(text):
return f'\033[93m{text}\033[39m'
def ko(text):
return f'\033[91m{text}\033[39m'
def default(text):
return text
def main(args):
checkers = [
Distribution.get(distro)(release)
for version in args.release
for (distro, release) in [version.split(':')]
]
stderr.write("Fetch Python versions...\n")
pyvers = [
'.'.join(map(str, checker.get_version('python3-defaults')[:2]))
for checker in checkers
]
uniq = sorted(set(pyvers), key=parse_version)
platforms = PLATFORM_NAMES if args.check_pypi else PLATFORM_NAMES[:1]
platform_codes = PLATFORM_CODES if args.check_pypi else PLATFORM_CODES[:1]
platform_headers = ['']
python_headers = ['']
table = [platform_headers, python_headers]
# requirements headers
for v in uniq:
for p in platforms:
platform_headers.append(p)
python_headers.append(v)
# distro headers
for checker, version in zip(checkers, pyvers):
platform_headers.append(checker._release[:5])
python_headers.append(version)
reqs = parse_requirements((Path.cwd() / __file__).parent.parent / 'requirements.txt')
if args.filter:
reqs = {r: o for r, o in reqs.items() if any(f in r for f in args.filter.split(','))}
for req, options in reqs.items():
if args.check_pypi:
pip_infos = PipPackage(req)
row = [req]
seps = [' || ']
byver = {}
for pyver in uniq:
# FIXME: when multiple options apply, check which pip uses
# (first-matching. best-matching, latest, ...)
seps[-1] = ' || '
for platform in platform_codes:
platform_version = 'none'
for version, markers in options:
if not markers or markers.evaluate({
'python_version': pyver,
'sys_platform': platform,
}):
if platform == 'linux':
byver[pyver] = version
platform_version = version
break
deco = None
if args.check_pypi:
if platform_version == 'none':
deco = 'ok'
else:
has_wheel_for_version, has_any_wheel, has_wheel_in_another_version = pip_infos.has_wheel_for(platform_version, pyver, platform)
if has_wheel_for_version:
deco = 'ok'
elif has_wheel_in_another_version:
deco = 'ko'
elif has_any_wheel:
deco = 'warn'
if deco in ("ok", None):
if byver.get(pyver, 'none') != platform_version:
deco = 'em'
req_ver = platform_version or 'any'
row.append((req_ver, deco))
seps.append(' | ')
seps[-1] = ' |#| '
# this requirement doesn't apply, ignore
if not byver and not args.all:
continue
for i, c in enumerate(checkers):
req_version = byver.get(pyvers[i], 'none') or 'any'
check_version = '.'.join(map(str, c.get_version(req.lower()) or [])) or None
if req_version != check_version:
deco = 'ko'
if req_version == 'none':
deco = 'ok'
elif req_version == 'any':
if check_version is None:
deco = 'ok'
elif check_version is None:
deco = 'ko'
elif parse_version(req_version) >= parse_version(check_version):
deco = 'warn'
row.append((check_version or '</>', deco))
elif args.all:
row.append((check_version or '</>', 'ok'))
else:
row.append('')
seps.append(' |#| ')
table.append(row)
seps[-1] = ' ' # remove last column separator
stderr.write('\n')
# evaluate width of columns
sizes = [0] * len(table[0])
for row in table:
sizes = [
max(s, len(cell[0] if isinstance(cell, tuple) else cell))
for s, cell in zip(sizes, row)
]
output_format = 'ansi'
if args.format:
output_format = args.format
assert format in SUPPORTED_FORMATS
elif args.output:
output_format = 'txt'
ext = args.output.split('.')[-1]
if ext in SUPPORTED_FORMATS:
output_format = ext
if output_format == 'json':
output = json.dumps(table)
else:
output = ''
# format table
for row in table:
output += ' '
for cell, width, sep in zip(row, sizes, seps):
cell_content = cell
deco = default
if isinstance(cell, tuple):
cell_content, level = cell
if output_format == 'txt' or level is None:
deco = default
elif level == 'ok':
deco = ok
elif level == 'em':
deco = em
elif level == 'warn':
deco = warn
else:
deco = ko
output += deco(f'{cell_content:<{width}}') + sep
output += '\n'
if output_format in ('svg', 'html'):
if not ansitoimg:
output_format = 'ansi'
stderr.write(f'Missing ansitoimg for {output_format} format, switching to ansi')
else:
convert = ansitoimg.ansiToSVG
if output_format == 'html':
convert = ansitoimg.ansiToHTML
with tempfile.NamedTemporaryFile() as tmp:
convert(output, tmp.name, width=(sum(sizes) + sum(len(sep) for sep in seps)), title='requirements-check.py')
output = tmp.read().decode()
# remove mac like bullets
output = output.replace('''<g transform="translate(26,22)">
<circle cx="0" cy="0" r="7" fill="#ff5f57"/>
<circle cx="22" cy="0" r="7" fill="#febc2e"/>
<circle cx="44" cy="0" r="7" fill="#28c840"/>
</g>''', "") #
if args.output:
with open(args.output, 'w', encoding='utf8') as f:
f.write(output)
else:
stdout.write(output)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'release', nargs='+',
help="Release to check against, should use the format '{distro}:{release}' e.g. 'debian:sid'"
)
parser.add_argument(
'-a', '--all', action="store_true",
help="Display all requirements even if it matches",
)
parser.add_argument(
'-o', '--output', help="output path",
)
parser.add_argument(
'-f', '--format', help=f"Supported format: {', '.join(SUPPORTED_FORMATS)}",
)
parser.add_argument(
'--update-cache', action="store_true",
help="Ignore the existing package version cache and update them",
)
parser.add_argument(
'--check-pypi', action="store_true",
help="Check wheel packages",
)
parser.add_argument(
'--filter',
help="Comma sepaated list of package to check",
)
args = parser.parse_args()
if args.update_cache:
shutil.rmtree('/tmp/package_versions_cache/')
main(args)

102
restore_db.py Normal file
View File

@ -0,0 +1,102 @@
#!/usr/bin/env python
import shutil
import odoorpc
import color_log
import argparse
import sys
import base64
import os
from datetime import datetime
# Default configuration
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8069
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
RESTORE_DIR = "odoo_backups"
OK, FAIL, INFO, WARNING = 0, 1, 2, 3
def connect_to_odoo(args: argparse.Namespace) -> odoorpc.ODOO:
"""Establish and verify Odoo connection."""
try:
odoo = odoorpc.ODOO(args.host, port=args.port)
color_log.Show(INFO, f"Available databases: {odoo.db.list()}")
# odoo.login(args.db_name, args.username, args.password)
# color_log.Show(OK, f"Connected to {args.host}:{args.port}, DB: {args.db_name}")
return odoo
except odoorpc.error.RPCError as e:
color_log.Show(FAIL, f"Login failed: {e}")
sys.exit(1)
except Exception as e:
color_log.Show(FAIL, f"Connection error: {e}")
sys.exit(1)
def parse_arguments() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(description="restore all Odoo databases.")
parser.add_argument("--host", default=DEFAULT_HOST, help="Odoo server host")
parser.add_argument(
"--port", type=int, default=DEFAULT_PORT, help="Odoo server port"
)
parser.add_argument(
"--admin-password", required=True, help="Odoo master admin password"
)
parser.add_argument(
"--database",
nargs="*",
help="Specific databases to restore (leave empty to restore all databases)",
)
return parser.parse_args()
def restore_database(odoo: odoorpc.ODOO, db_name: str, admin_password: str):
"""Restore a single Odoo database."""
try:
backup_path = os.path.join(RESTORE_DIR, f"{db_name}.zip")
if not os.path.exists(backup_path):
print(f"Backup file for {db_name} not found: {backup_path}")
return
with open(backup_path, "rb") as f:
print(f"Restoring database: {db_name} from {backup_path}...")
timeout_backup = odoo.config['timeout']
odoo.config['timeout'] = 7200 # Timeout set to 2 hours
odoo.db.restore(admin_password, db_name, f)
odoo.config['timeout'] = timeout_backup
print(f"Database {db_name} restored successfully.")
except Exception as e:
print(f"Failed to restore {db_name}: {e}")
def restore_all_databases(odoo: odoorpc.ODOO, admin_password: str):
"""Restore all databases from backup files in the restore directory."""
try:
backup_files = [f for f in os.listdir(RESTORE_DIR) if f.endswith(".zip")]
print("Backup files found:", backup_files)
for backup_file in backup_files:
db_name = os.path.splitext(backup_file)[0]
restore_database(odoo, db_name, admin_password)
except Exception as e:
print(f"Error restoring databases: {e}")
def main():
"""Main execution flow."""
args = parse_arguments()
odoo = connect_to_odoo(args)
if args.database:
for db_name in args.database:
restore_database(odoo, db_name, args.admin_password)
else:
restore_all_databases(odoo, args.admin_password)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
color_log.Show(FAIL, "\nOperation cancelled by user")
sys.exit(1)

3
update_tag.sh Executable file
View File

@ -0,0 +1,3 @@
#!/usr/bin/bash
set +x
sed -i "s/TAG := \$(shell rev-parse --abbrev-ref HEAD)/TAG := $1/g" Makefile

92
upgrade_module.py Normal file
View File

@ -0,0 +1,92 @@
#!/usr/bin/env python
"""Install/update/uninstall specified odoo module."""
import odoorpc
import argparse
USER = "admin"
PASSWORD = "admin"
HOST = "localhost"
PORT = "8069"
DB = "odoodb"
def prepare_args():
"""Prepare arguments for module action RPC call."""
parser = argparse.ArgumentParser(
description="Run modules install, upgrade or uninstall."
)
parser.add_argument(
"-i",
"--install",
help="Comma separated list of modules to install",
)
parser.add_argument(
"-u",
"--upgrade",
help="Comma separated list of modules to upgrade",
)
parser.add_argument(
"-del",
"--delete",
help="Comma separated list of modules to uninstall",
)
parser.add_argument(
"--user",
help="User to log in with",
default=USER,
)
parser.add_argument(
"--password",
help="Password to log in with",
default=PASSWORD,
)
parser.add_argument(
"--host",
help="Host to log in to",
default=HOST,
)
parser.add_argument(
"--port",
help="Odoo port",
default=PORT,
)
parser.add_argument(
"-d",
"--database",
help="Database name to log in to",
default=DB,
)
return parser.parse_args()
def login(user, password, host, port, database):
"""Login to Odoo database and return connection object."""
odoo = odoorpc.ODOO(host, port=port)
odoo.login(database, user, password)
return odoo
def _find_modules(env, module_names):
IrModuleModule = env["ir.module.module"]
modules = module_names.replace(" ", "").split(",")
module_ids = IrModuleModule.search([("name", "in", modules)])
return IrModuleModule.browse(module_ids)
def trigger_action(env, module_names, action):
modules = _find_modules(env, module_names)
method = getattr(modules, f"button_immediate_{action}")
return method()
if __name__ == "__main__":
args = prepare_args()
odoo = login(args.user, args.password, args.host, args.port, args.database)
env = odoo.env
if args.install:
trigger_action(env, args.install, "install")
if args.upgrade:
trigger_action(env, args.upgrade, "upgrade")
if args.delete:
trigger_action(env, args.delete, "uninstall")