NextERP-scripts/services/git/handler.py

222 lines
8.4 KiB
Python

import asyncio
import git
import os
from services.odoo.connection import OdooConnection
import common.color_log as color_log
import subprocess
from urllib.parse import urlparse
from services.cmd.ssh import SSHHandler
class GitHandler:
def __init__(self, config_path="config/settings.yaml"):
self.config = OdooConnection(config_path)
self.local_path = None # Will be set based on instance configuration
self.ssh_handler = SSHHandler()
def _get_auth_url(self, repo_url, instance):
"""Add authentication to repository URL if credentials are provided."""
if not repo_url:
return repo_url
git_config = instance.get("git", {})
username = git_config.get("username")
password = git_config.get("password")
if username and password:
parsed_url = urlparse(repo_url)
# Replace the URL with authenticated version
auth_url = f"{parsed_url.scheme}://{username}:{password}@{parsed_url.netloc}{parsed_url.path}"
return auth_url
return repo_url
def clone_or_open_repo(self, instance_name=None, repo_url=None, branch=None):
"""Clone or open repository with SSH support"""
try:
if not instance_name:
# Local operation
if not os.path.exists(self.local_path):
cmd = (
f"git clone -b {branch or 'main'} {repo_url} {self.local_path}"
)
self._execute_command(cmd, "local")
return True
# Remote operation
instance = self.config.get_instance(instance_name)
if not instance:
raise ValueError(f"Instance {instance_name} not found")
# Set local_path from instance configuration
self.local_path = instance.get("git", {}).get("local_path")
if not self.local_path:
raise ValueError(
f"No local_path configured for instance {instance_name}"
)
# Get repository URL from instance configuration if not provided
if not repo_url:
repo_url = instance.get("git", {}).get("repo_url")
if not repo_url:
raise ValueError(
f"No repository URL configured for instance {instance_name}"
)
# Add authentication to repository URL
auth_url = self._get_auth_url(repo_url, instance)
# Check if repo exists
check_cmd = (
f"test -d {self.local_path}/.git && echo 'exists' || echo 'not exists'"
)
remote_check_cmd = self.ssh_handler._get_remote_command(instance, check_cmd)
result = self.ssh_handler._execute_command(remote_check_cmd, instance_name)
if "not exists" in result:
# Clone repository with authentication
clone_cmd = (
f"git clone -b {branch or 'main'} {auth_url} {self.local_path}"
)
remote_clone_cmd = self.ssh_handler._get_remote_command(
instance, clone_cmd
)
self.ssh_handler._execute_command(remote_clone_cmd, instance_name)
return True
except Exception as e:
color_log.Show("FAILED", f"Error in clone_or_open_repo: {e}")
return False
def pull_updates(self, instance_name=None, branch=None, force=False):
"""Pull updates with SSH support
Args:
instance_name (str, optional): Name of the instance
branch (str, optional): Branch to pull from
force (bool, optional): If True, will perform hard reset before pulling
"""
try:
if not instance_name:
# Local operation
if not self.local_path:
raise ValueError("local_path not set for local operation")
cmd = f"git --git-dir={self.local_path}/.git --work-tree={self.local_path} pull origin {branch or 'main'}"
self._execute_command(cmd, "local")
return True
# Remote operation
instance = self.config.get_instance(instance_name)
branch = instance.get("git", {}).get("branch") or branch
if not instance:
raise ValueError(f"Instance {instance_name} not found")
# Set local_path from instance configuration
self.local_path = instance.get("git", {}).get("local_path")
if not self.local_path:
raise ValueError(
f"No local_path configured for instance {instance_name}"
)
# Get repository URL and add authentication
repo_url = instance.get("git", {}).get("repo_url")
auth_url = self._get_auth_url(repo_url, instance)
# Configure git to use credentials and set remote URL
git_commands = [
f"cd {self.local_path}",
f"git config --local credential.helper store",
f"git remote set-url origin {auth_url}",
]
# Add force reset commands if force is True
if force:
git_commands.extend(
[
"git fetch origin",
f"git reset --hard origin/{branch or 'main'}",
"git clean -fd", # Remove untracked files and directories
]
)
else:
git_commands.append(f"git pull origin {branch or 'main'}")
# Combine commands and execute remotely
combined_cmd = " && ".join(git_commands)
remote_cmd = self.ssh_handler._get_remote_command(instance, combined_cmd)
self.ssh_handler._execute_command(remote_cmd, instance_name)
return True
except Exception as e:
color_log.Show("FAILED", f"Error pulling updates: {e}")
return False
def get_current_commit(self):
return self.repo.head.commit.hexsha if self.repo else None
def get_submodule_paths(project_path):
result = subprocess.run(
["git", "submodule", "--quiet", "foreach", "echo $path"],
cwd=project_path,
capture_output=True,
text=True,
)
paths = result.stdout.strip().split("\n") if result.returncode == 0 else []
return [os.path.join(project_path, p) for p in paths if p]
def get_git_diff(project_path, summary=True):
"""Run git diff and return the output."""
flag = "--compact-summary" if summary else ""
result = os.popen(f'cd "{project_path}" && git diff {flag}').read()
staged_changes = os.popen(
f'cd "{project_path}" && git diff --cached {flag}'
).read()
# Combine both staged and unstaged changes
all_changes = result.strip() + "\n" + staged_changes.strip()
return all_changes.strip() if all_changes.strip() else "✓ No changes"
async def has_submodules(self):
"""Check if the repository has any submodules."""
proc = await asyncio.create_subprocess_exec(
"git",
"config",
"--file",
".gitmodules",
"--get-regexp",
r"^submodule\.",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
return bool(stdout.decode().strip())
async def get_submodule_info(self):
if not await self.has_submodules():
return {}
proc = await asyncio.create_subprocess_exec(
"git",
"config",
"--file",
".gitmodules",
"--get-regexp",
r"^submodule\..*\.path$",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if stderr:
print(f"Error getting submodule paths:\n{stderr.decode()}")
return {}
submodule_info = {}
for line in stdout.decode().splitlines():
parts = line.split()
if len(parts) == 2:
name = parts[0].split(".")[1]
path = parts[1]
abs_path = os.path.abspath(path)
submodule_info[abs_path] = name
return submodule_info