70 lines
2.8 KiB
Python
70 lines
2.8 KiB
Python
from services.git.handler import GitHandler
|
|
from services.odoo.connection import OdooConnection
|
|
import subprocess
|
|
|
|
|
|
class OdooModuleManager:
|
|
def __init__(self, config_path: str = "config/settings.yaml"):
|
|
self.config = OdooConnection(config_path)
|
|
|
|
def _manage_module(
|
|
self, action: str, instance_name: str = None, module_names: list = None
|
|
) -> None:
|
|
"""Generic method to install, uninstall, or upgrade modules."""
|
|
if action not in {"install", "uninstall", "upgrade"}:
|
|
raise ValueError(f"Invalid action: {action}")
|
|
|
|
self.config.connect(instance_name)
|
|
|
|
for instance in self.config.get_instances():
|
|
if instance_name and instance["name"] != instance_name:
|
|
continue
|
|
|
|
print(f"Processing instance: {instance['name']}")
|
|
for module_name in module_names:
|
|
try:
|
|
print(
|
|
f"{action.capitalize()}ing module: {module_name} in {instance['name']}"
|
|
)
|
|
|
|
module_ids = self.config.execute(
|
|
instance["name"],
|
|
"ir.module.module",
|
|
"search",
|
|
[("name", "=", module_name)],
|
|
)
|
|
|
|
if not module_ids:
|
|
print(
|
|
f"Module {module_name} not found in {instance['name']}, skipping."
|
|
)
|
|
continue
|
|
|
|
button_action = f"button_immediate_{action}"
|
|
self.config.execute(
|
|
instance["name"],
|
|
"ir.module.module",
|
|
button_action,
|
|
module_ids, # Pass list directly instead of wrapping in another list
|
|
)
|
|
|
|
print(
|
|
f"Module {module_name} {action}ed successfully in {instance['name']}"
|
|
)
|
|
except Exception as e:
|
|
print(
|
|
f"Error while {action}ing {module_name} in {instance['name']}: {e}"
|
|
)
|
|
|
|
def install(self, instance_name: str = None,module_names: list = None) -> None:
|
|
"""Install multiple modules for the specified instance(s)."""
|
|
self._manage_module("install", instance_name,module_names)
|
|
|
|
def uninstall(self, instance_name: str = None,module_names: list = None) -> None:
|
|
"""Uninstall multiple modules for the specified instance(s)."""
|
|
self._manage_module("uninstall", instance_name,module_names)
|
|
|
|
def upgrade(self, instance_name: str = None,module_names: list = None) -> None:
|
|
"""Upgrade multiple modules for the specified instance(s)."""
|
|
self._manage_module("upgrade", instance_name,module_names)
|