mirror of
https://github.com/odoo/runbot.git
synced 2025-03-19 17:35:45 +07:00
135 lines
4.4 KiB
Python
135 lines
4.4 KiB
Python
![]() |
import dataclasses
|
||
|
import itertools
|
||
|
import logging
|
||
|
import pathlib
|
||
|
import resource
|
||
|
import subprocess
|
||
|
from typing import Optional, TypeVar, Union, Generic
|
||
|
|
||
|
from odoo.tools.appdirs import user_cache_dir
|
||
|
|
||
|
_logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def source_url(repository, prefix: str) -> str:
|
||
|
return 'https://{}:{}@github.com/{}'.format(
|
||
|
repository.project_id[f'{prefix}_name'] or '',
|
||
|
repository.project_id[f'{prefix}_token'],
|
||
|
repository.name,
|
||
|
)
|
||
|
|
||
|
|
||
|
def get_local(repository, prefix: Optional[str]) -> 'Optional[Repo[subprocess.CompletedProcess]]':
|
||
|
repos_dir = pathlib.Path(user_cache_dir('mergebot'))
|
||
|
repos_dir.mkdir(parents=True, exist_ok=True)
|
||
|
# NB: `repository.name` is `$org/$name` so this will be a subdirectory, probably
|
||
|
repo_dir = repos_dir / repository.name
|
||
|
|
||
|
if repo_dir.is_dir():
|
||
|
return git(repo_dir)
|
||
|
elif prefix:
|
||
|
_logger.info("Cloning out %s to %s", repository.name, repo_dir)
|
||
|
subprocess.run(['git', 'clone', '--bare', source_url(repository, prefix), str(repo_dir)], check=True)
|
||
|
# bare repos don't have fetch specs by default, and fetching *into*
|
||
|
# them is a pain in the ass, configure fetch specs so `git fetch`
|
||
|
# works properly
|
||
|
repo = git(repo_dir)
|
||
|
repo.config('--add', 'remote.origin.fetch', '+refs/heads/*:refs/heads/*')
|
||
|
# negative refspecs require git 2.29
|
||
|
repo.config('--add', 'remote.origin.fetch', '^refs/heads/tmp.*')
|
||
|
repo.config('--add', 'remote.origin.fetch', '^refs/heads/staging.*')
|
||
|
return repo
|
||
|
|
||
|
|
||
|
ALWAYS = ('gc.auto=0', 'maintenance.auto=0')
|
||
|
|
||
|
|
||
|
def _bypass_limits():
|
||
|
resource.setrlimit(resource.RLIMIT_AS, (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
|
||
|
|
||
|
|
||
|
def git(directory: str) -> 'Repo[subprocess.CompletedProcess]':
|
||
|
return Repo(directory, check=True)
|
||
|
|
||
|
|
||
|
Self = TypeVar("Self", bound="Repo")
|
||
|
T = TypeVar('T', bound=Union[subprocess.CompletedProcess, subprocess.Popen])
|
||
|
class Repo(Generic[T]):
|
||
|
def __init__(self, directory, **config) -> None:
|
||
|
self._directory = str(directory)
|
||
|
config.setdefault('stderr', subprocess.PIPE)
|
||
|
self._config = config
|
||
|
self._params = ()
|
||
|
self._opener = subprocess.run
|
||
|
|
||
|
def __getattr__(self, name: str) -> 'GitCommand':
|
||
|
return GitCommand(self, name.replace('_', '-'))
|
||
|
|
||
|
def _run(self, *args, **kwargs) -> T:
|
||
|
opts = {**self._config, **kwargs}
|
||
|
args = ('git', '-C', self._directory)\
|
||
|
+ tuple(itertools.chain.from_iterable(('-c', p) for p in self._params + ALWAYS))\
|
||
|
+ args
|
||
|
try:
|
||
|
return self._opener(args, preexec_fn=_bypass_limits, **opts)
|
||
|
except subprocess.CalledProcessError as e:
|
||
|
stream = e.stderr if e.stderr else e.stdout
|
||
|
if stream:
|
||
|
_logger.error("git call error: %s", stream)
|
||
|
raise
|
||
|
|
||
|
def stdout(self, flag: bool = True) -> Self:
|
||
|
if flag is True:
|
||
|
return self.with_config(stdout=subprocess.PIPE)
|
||
|
elif flag is False:
|
||
|
return self.with_config(stdout=None)
|
||
|
return self.with_config(stdout=flag)
|
||
|
|
||
|
def lazy(self) -> 'Repo[subprocess.Popen]':
|
||
|
r = self.with_config()
|
||
|
r._config.pop('check', None)
|
||
|
r._opener = subprocess.Popen
|
||
|
return r
|
||
|
|
||
|
def check(self, flag: bool) -> Self:
|
||
|
return self.with_config(check=flag)
|
||
|
|
||
|
def with_config(self, **kw) -> Self:
|
||
|
opts = {**self._config, **kw}
|
||
|
r = Repo(self._directory, **opts)
|
||
|
r._opener = self._opener
|
||
|
r._params = self._params
|
||
|
return r
|
||
|
|
||
|
def with_params(self, *args) -> Self:
|
||
|
r = self.with_config()
|
||
|
r._params = args
|
||
|
return r
|
||
|
|
||
|
def clone(self, to: str, branch: Optional[str] = None) -> 'Repo[subprocess.CompletedProcess]':
|
||
|
self._run(
|
||
|
'clone',
|
||
|
*([] if branch is None else ['-b', branch]),
|
||
|
self._directory, to,
|
||
|
)
|
||
|
return Repo(to)
|
||
|
|
||
|
|
||
|
@dataclasses.dataclass
|
||
|
class GitCommand(Generic[T]):
|
||
|
repo: Repo[T]
|
||
|
name: str
|
||
|
|
||
|
def __call__(self, *args, **kwargs) -> T:
|
||
|
return self.repo._run(self.name, *args, *self._to_options(kwargs))
|
||
|
|
||
|
def _to_options(self, d):
|
||
|
for k, v in d.items():
|
||
|
if len(k) == 1:
|
||
|
yield '-' + k
|
||
|
else:
|
||
|
yield '--' + k.replace('_', '-')
|
||
|
if v not in (None, True):
|
||
|
assert v is not False
|
||
|
yield str(v)
|