runbot/mergebot_test_utils/utils.py
Xavier Morel e3b4d2bb40 [IMP] *: cleanup status contexts in tests
For historical reasons pretty much all tests used to use the contexts
legal/cla and ci/runbot. While there are a few tests where we need the
interactions of multiple contexts and that makes sense, on the vast
majority of tests that's just extra traffic and noise in the
test (from needing to send multiple statuses unnecessarily).

In fact on the average PR where everything passes by default we could
even remove the required statuses entirely...
2025-02-06 14:55:28 +01:00

234 lines
8.0 KiB
Python

# -*- coding: utf-8 -*-
import contextlib
import itertools
import re
import time
import typing
from lxml import html
MESSAGE_TEMPLATE = """{message}
closes {repo}#{number}
{headers}Signed-off-by: {name} <{email}>"""
# target branch '-' source branch '-' batch id '-fw'
REF_PATTERN = r'{target}-{source}-\d+-fw'
class Commit:
def __init__(self, message, *, author=None, committer=None, tree, reset=False):
self.id = None
self.message = message
self.author = author
self.committer = committer
self.tree = tree
self.reset = reset
def validate_all(repos, refs, contexts=('default',)):
""" Post a "success" status for each context on each ref of each repo
"""
for repo, branch, context in itertools.product(repos, refs, contexts):
repo.post_status(branch, 'success', context)
def get_partner(env, gh_login):
return env['res.partner'].search([('github_login', '=', gh_login)])
def _simple_init(repo):
""" Creates a very simple initialisation: a master branch with a commit,
and a PR by 'user' with two commits, targeted to the master branch
"""
m = repo.make_commit(None, 'initial', None, tree={'m': 'm'})
repo.make_ref('heads/master', m)
c1 = repo.make_commit(m, 'first', None, tree={'m': 'c1'})
c2 = repo.make_commit(c1, 'second', None, tree={'m': 'c2'})
prx = repo.make_pr(title='title', body='body', target='master', head=c2)
return prx
class matches(str):
# necessary so str.__new__ does not freak out on `flags`
def __new__(cls, pattern, flags=0):
return super().__new__(cls, pattern)
def __init__(self, pattern, flags=0):
p, n = re.subn(
# `re.escape` will escape the `$`, so we need to handle that...
# maybe it should not be $?
r'\\\$(\w*?)\\\$',
lambda m: f'(?P<{m[1]}>.*?)' if m[1] else '(.*?)',
re.escape(self),
)
assert n, f"matches' pattern should have at least one placeholder, found none in\n{pattern}"
self._r = re.compile(p, flags | re.DOTALL)
def __eq__(self, text):
if not isinstance(text, str):
return NotImplemented
return self._r.search(text)
def seen(env, pr, users):
url = to_pr(env, pr).url
return users['user'], f'[![Pull request status dashboard]({url}.png)]({url})'
def make_basic(
env,
config,
make_repo,
*,
project_name='myproject',
reponame='proj',
statuses,
fp_token=True,
fp_remote=True,
):
""" Creates a project ``project_name`` **if none exists**, otherwise
retrieves the existing one and adds a new repository and its fork.
Repositories are setup with three forking branches:
::
f = 0 -- 1 -- 2 -- 3 -- 4 : a
|
g = `-- 11 -- 22 : b
|
h = `-- 111 : c
each branch just adds and modifies a file (resp. f, g and h) through the
contents sequence a b c d e
:param env: Environment, for odoo model interactions
:param config: pytest project config thingie
:param make_repo: repo maker function, normally the fixture, should be a
``Callable[[str], Repo]``
:param project_name: internal project name, can be used to recover the
project object afterward, matches exactly since it's
unique per odoo db (and thus test)
:param reponame: the base name of the repository, for identification, for
concurrency reasons the actual repository name *will* be
different
:param statuses: required statuses for the repository, stupidly default to
the old Odoo statuses, should be moved to ``default`` over
time for simplicity (unless the test specifically calls for
multiple statuses)
:param fp_token: whether to set the ``fp_github_token`` on the project if
/ when creating it
:param fp_remote: whether to create a fork repo and set it as the
repository's ``fp_remote_target``
"""
Projects = env['runbot_merge.project']
project = Projects.search([('name', '=', project_name)])
if not project:
project = env['runbot_merge.project'].create({
'name': project_name,
'github_token': config['github']['token'],
'github_prefix': 'hansen',
'github_name': config['github']['name'],
'github_email': "foo@example.org",
'fp_github_token': fp_token and config['github']['token'],
'fp_github_name': 'herbert',
'branch_ids': [
(0, 0, {'name': 'a', 'sequence': 100}),
(0, 0, {'name': 'b', 'sequence': 80}),
(0, 0, {'name': 'c', 'sequence': 60}),
],
})
prod = make_repo(reponame)
env['runbot_merge.events_sources'].create({'repository': prod.name})
with prod:
_0, _1, a_2, _3, _4, = prod.make_commits(
None,
Commit("0", tree={'f': 'a'}),
Commit("1", tree={'f': 'b'}),
Commit("2", tree={'f': 'c'}),
Commit("3", tree={'f': 'd'}),
Commit("4", tree={'f': 'e'}),
ref='heads/a',
)
b_1, _2 = prod.make_commits(
a_2,
Commit('11', tree={'g': 'a'}),
Commit('22', tree={'g': 'b'}),
ref='heads/b',
)
prod.make_commits(
b_1,
Commit('111', tree={'h': 'a'}),
ref='heads/c',
)
other = prod.fork() if fp_remote else None
repo = env['runbot_merge.repository'].create({
'project_id': project.id,
'name': prod.name,
'required_statuses': statuses,
'fp_remote_target': other.name if other else False,
'group_id': False,
})
env['res.partner'].search([
('github_login', '=', config['role_reviewer']['user'])
]).write({
'review_rights': [(0, 0, {'repository_id': repo.id, 'review': True})]
})
env['res.partner'].search([
('github_login', '=', config['role_self_reviewer']['user'])
]).write({
'review_rights': [(0, 0, {'repository_id': repo.id, 'self_review': True})]
})
return prod, other
def pr_page(page, pr):
return html.fromstring(page(f'/{pr.repo.name}/pull/{pr.number}'))
def to_pr(env, pr, *, attempts=5):
for _ in range(attempts):
pr_id = env['runbot_merge.pull_requests'].search([
('repository.name', '=', pr.repo.name),
('number', '=', pr.number),
])
if pr_id:
assert len(pr_id) == 1, f"Expected to find {pr.repo.name}#{pr.number}, got {pr_id}."
return pr_id
time.sleep(1)
raise TimeoutError(f"Unable to find {pr.repo.name}#{pr.number}")
def part_of(label, pr_id, *, separator='\n\n'):
""" Adds the "part-of" pseudo-header in the footer.
"""
return f"""\
{label}{separator}\
Part-of: {pr_id.display_name}
Signed-off-by: {pr_id.reviewed_by.formatted_email}"""
def ensure_one(records):
assert len(records) == 1
return records
@contextlib.contextmanager
def prevent_unstaging(st) -> None:
# hack: `Stagings.cancel` only cancels *active* stagings,
# so if we disable the staging, do a thing, then re-enable
# then things should work out
assert st and st.active, "preventing unstaging of not-a-staging is not useful"
st.active = False
try:
yield
finally:
st.active = True
TYPE_MAPPING = {
'boolean': 'integer',
'date': 'datetime',
'monetary': 'float',
'selection': 'char',
'many2one': 'char',
}
def read_tracking_value(tv) -> tuple[str, typing.Any, typing.Any]:
field_id = tv.field_id if 'field_id' in tv else tv.field
field_type = field_id.field_type if 'field_type' in field_id else field_id.ttype
t = TYPE_MAPPING.get(field_type) or field_type
return field_id.name, tv[f"old_value_{t}"], tv[f"new_value_{t}"]