refactor: remove SSH handling from GitHandler and update cmd service usage

This commit is contained in:
KaySar12 2025-05-29 09:54:27 +07:00
parent 49a722e0a1
commit 6a8c514817
4 changed files with 48 additions and 225 deletions

View File

@ -6,7 +6,7 @@ import sys
import fcntl
import select
from ..services import config as Config
from ..lib import color_log
from ..common import color_log
def set_nonblocking(fd):

0
services/cmd/__init__.py Normal file
View File

View File

@ -10,146 +10,47 @@ class SSHHandler:
def __init__(self, config_path="config/settings.yaml"):
self.config = OdooConnection(config_path)
def _check_sshpass():
"""Check if sshpass is installed in the system."""
return shutil.which("sshpass") is not None
def _get_remote_command(self, instance, cmd):
"""Generate SSH command for remote execution."""
host = instance["host"]
ssh_settings = instance.get("ssh", {})
ssh_user = ssh_settings.get("user", "root")
ssh_key_path = ssh_settings.get("key_path")
def _build_command(host, cmd, ssh_user="root", ssh_key_path=None, passphrase=None):
"""
Build an SSH command with the given parameters.
local_host = host in ["localhost", "127.0.0.1"]
Args:
host (str): The target host
cmd (str): The command to execute
ssh_user (str): SSH user (default: root)
ssh_key_path (str, optional): Path to SSH key file
passphrase (str, optional): Passphrase for SSH key
if not local_host:
if not ssh_key_path:
return f"ssh -t {ssh_user}@{host} 'sudo -s bash -c \"{cmd}\"'"
else:
return f"ssh -i {ssh_key_path} {ssh_user}@{host} 'sudo -s bash -c \"{cmd}\"'"
return cmd
Returns:
tuple: (args, env) where args is a list of command arguments, and env is a dictionary of environment variables
"""
# For local execution
if host in ["localhost", "127.0.0.1"]:
return ["bash", "-c", cmd], None
# Build SSH command as a list of arguments
args = [
"ssh",
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
"-t",
]
if ssh_key_path:
args.extend(["-i", ssh_key_path])
args.append(f"{ssh_user}@{host}")
# Quote the command safely for remote execution
remote_cmd = f"sudo -s bash -c {shlex.quote(cmd)}"
args.append(remote_cmd)
env = None
# Handle passphrase securely with sshpass
if passphrase:
if not SSHHandler._check_sshpass():
raise RuntimeError(
"sshpass is required for passphrase authentication but not installed"
)
args = ["sshpass", "-e"] + args
env = {"SSHPASS": passphrase}
return args, env
def execute_command(args, instance_name, capture_output=True, env=None):
"""
Execute a command and handle errors.
Args:
args (list): List of command arguments
instance_name (str): Name of the instance (for logging)
capture_output (bool): Whether to capture command output
env (dict, optional): Environment variables to set for the command
Returns:
str: Command output if capture_output is True
Raises:
subprocess.CalledProcessError: If command execution fails
"""
def _execute_command(self, cmd, instance_name):
"""Execute a shell command and handle errors."""
try:
# Log the command being executed
color_log.Show("INFO", f"Executing command: {' '.join(args)}")
# Create a copy of the current environment
cmd_env = os.environ.copy()
# Update with any additional environment variables
if env:
cmd_env.update(env)
# Execute the command with subprocess
color_log.Show("INFO", f"Executing command: {cmd}")
result = subprocess.run(
args, check=True, capture_output=capture_output, text=True, env=cmd_env
cmd, shell=True, check=True, capture_output=True, text=True
)
# Handle output if captured
if capture_output:
if result.stdout:
print(result.stdout.strip())
if result.stderr:
print(result.stderr.strip())
# Print the output if there is any
if result.stdout:
print(result.stdout.strip())
if result.stderr:
print(result.stderr.strip())
color_log.Show("OK", f"Command executed successfully for {instance_name}")
return result.stdout if capture_output else None
return result.stdout
except subprocess.CalledProcessError as e:
# Handle errors and log output
if capture_output:
if e.stdout:
print(e.stdout.strip())
if e.stderr:
print(e.stderr.strip())
# Print the error output
if e.stdout:
print(e.stdout.strip())
if e.stderr:
print(e.stderr.strip())
color_log.Show(
"FAILED",
f"Error executing command for {instance_name}: {e}",
f"Error performing git operation for {instance_name}: {e}",
)
raise
def run_ssh_command(
host,
cmd,
instance_name,
ssh_user="root",
ssh_key_path=None,
passphrase=None,
capture_output=True,
env=None,
):
"""
Execute a command on a remote host via SSH or locally if host is localhost.
Args:
host (str): The target host
cmd (str): The command to execute
instance_name (str): Name of the instance (for logging)
ssh_user (str): SSH user (default: root)
ssh_key_path (str, optional): Path to SSH key file
passphrase (str, optional): Passphrase for SSH key
capture_output (bool): Whether to capture command output
env (dict, optional): Additional environment variables to set for the command
Returns:
str: Command output if capture_output is True
"""
# Build the command and get any required environment variables
args, cmd_env = SSHHandler._build_command(
host, cmd, ssh_user, ssh_key_path, passphrase
)
# Merge command-specific env with user-provided env
if cmd_env:
if env is None:
env = cmd_env
else:
env.update(cmd_env)
# Execute the command
return SSHHandler.execute_command(args, instance_name, capture_output, env)

View File

@ -5,41 +5,14 @@ from services.odoo.connection import OdooConnection
import common.color_log as color_log
import subprocess
from urllib.parse import urlparse
from services.cmd.ssh import SSHHandler
class GitHandler:
def __init__(self, config_path="config/settings.yaml"):
self.config = OdooConnection(config_path)
self.local_path = None # Will be set based on instance configuration
def _execute_command(self, cmd, instance_name):
"""Execute a shell command and handle errors."""
try:
color_log.Show("INFO", f"Executing command: {cmd}")
result = subprocess.run(
cmd, shell=True, check=True, capture_output=True, text=True
)
# Print the output if there is any
if result.stdout:
print(result.stdout.strip())
if result.stderr:
print(result.stderr.strip())
color_log.Show("OK", f"Command executed successfully for {instance_name}")
return result.stdout
except subprocess.CalledProcessError as e:
# Print the error output
if e.stdout:
print(e.stdout.strip())
if e.stderr:
print(e.stderr.strip())
color_log.Show(
"FAILED",
f"Error performing git operation for {instance_name}: {e}",
)
raise
self.ssh_handler = SSHHandler(config_path)
def _get_auth_url(self, repo_url, instance):
"""Add authentication to repository URL if credentials are provided."""
@ -57,22 +30,6 @@ class GitHandler:
return auth_url
return repo_url
def _get_remote_command(self, instance, cmd):
"""Generate SSH command for remote execution."""
host = instance["host"]
ssh_settings = instance.get("ssh", {})
ssh_user = ssh_settings.get("user", "root")
ssh_key_path = ssh_settings.get("key_path")
local_host = host in ["localhost", "127.0.0.1"]
if not local_host:
if not ssh_key_path:
return f"ssh -t {ssh_user}@{host} 'sudo -s bash -c \"{cmd}\"'"
else:
return f"ssh -i {ssh_key_path} {ssh_user}@{host} 'sudo -s bash -c \"{cmd}\"'"
return cmd
def clone_or_open_repo(self, instance_name=None, repo_url=None, branch=None):
"""Clone or open repository with SSH support"""
try:
@ -112,16 +69,18 @@ class GitHandler:
check_cmd = (
f"test -d {self.local_path}/.git && echo 'exists' || echo 'not exists'"
)
remote_check_cmd = self._get_remote_command(instance, check_cmd)
result = self._execute_command(remote_check_cmd, instance_name)
remote_check_cmd = self.ssh_handler._get_remote_command(instance, check_cmd)
result = self.ssh_handler._execute_command(remote_check_cmd, instance_name)
if "not exists" in result:
# Clone repository with authentication
clone_cmd = (
f"git clone -b {branch or 'main'} {auth_url} {self.local_path}"
)
remote_clone_cmd = self._get_remote_command(instance, clone_cmd)
self._execute_command(remote_clone_cmd, instance_name)
remote_clone_cmd = self.ssh_handler._get_remote_command(
instance, clone_cmd
)
self.ssh_handler._execute_command(remote_clone_cmd, instance_name)
return True
except Exception as e:
@ -183,8 +142,8 @@ class GitHandler:
# Combine commands and execute remotely
combined_cmd = " && ".join(git_commands)
remote_cmd = self._get_remote_command(instance, combined_cmd)
self._execute_command(remote_cmd, instance_name)
remote_cmd = self.ssh_handler._get_remote_command(instance, combined_cmd)
self.ssh_handler._execute_command(remote_cmd, instance_name)
return True
except Exception as e:
@ -194,64 +153,27 @@ class GitHandler:
def get_current_commit(self):
return self.repo.head.commit.hexsha if self.repo else None
def _get_command(self, instance, action, repo_url=None, branch=None):
"""
Generate the appropriate git command based on instance type and action.
Args:
instance (dict): Instance configuration
action (str): Git action (clone/pull)
repo_url (str, optional): Repository URL for clone operation
branch (str, optional): Branch name
Returns:
str: Generated git command
"""
host = instance["host"]
ssh_settings = instance.get("ssh", {})
ssh_user = ssh_settings.get("user", "root")
ssh_key_path = ssh_settings.get("key_path")
local_host = host in ["localhost", "127.0.0.1"]
# Base git command
if action == "clone":
if not repo_url:
raise ValueError("Repository URL is required for clone operation")
cmd = f"git clone -b {branch or 'main'} {repo_url} {self.local_path}"
elif action == "pull":
cmd = f"git --git-dir={self.local_path}/.git --work-tree={self.local_path} pull origin {branch or 'main'}"
else:
raise ValueError(f"Unsupported git action: {action}")
# Wrap with SSH if remote host
if not local_host:
if not ssh_key_path:
cmd = f"ssh -t {ssh_user}@{host} 'sudo {cmd}'"
else:
cmd = f"ssh -i {ssh_key_path} {ssh_user}@{host} 'sudo {cmd}'"
return cmd
def get_submodule_paths(project_path):
result = subprocess.run(
['git', 'submodule', '--quiet', 'foreach', 'echo $path'],
["git", "submodule", "--quiet", "foreach", "echo $path"],
cwd=project_path,
capture_output=True,
text=True
text=True,
)
paths = result.stdout.strip().split('\n') if result.returncode == 0 else []
paths = result.stdout.strip().split("\n") if result.returncode == 0 else []
return [os.path.join(project_path, p) for p in paths if p]
def get_git_diff(project_path, summary=True):
"""Run git diff and return the output."""
flag = "--compact-summary" if summary else ""
result = os.popen(f'cd "{project_path}" && git diff {flag}').read()
staged_changes = os.popen(f'cd "{project_path}" && git diff --cached {flag}').read()
staged_changes = os.popen(
f'cd "{project_path}" && git diff --cached {flag}'
).read()
# Combine both staged and unstaged changes
all_changes = result.strip() + "\n" + staged_changes.strip()
return all_changes.strip() if all_changes.strip() else "✓ No changes"
return all_changes.strip() if all_changes.strip() else "✓ No changes"
async def has_submodules(self):
"""Check if the repository has any submodules."""
@ -296,4 +218,4 @@ class GitHandler:
path = parts[1]
abs_path = os.path.abspath(path)
submodule_info[abs_path] = name
return submodule_info
return submodule_info