[CHG] runbot_merge: move staging readiness to batch

Staging readiness is a batch-level concerns, and many of the markers
are already there though a few need to be aggregated from the PRs. As
such, staging has no reason to be performed in terms of PRs anymore,
it should be performed via batches directly.

There is a bit of a mess in order not to completely fuck up when
retargeting PRs (implicitly via freeze wizard, or explicitely) as for
now we're moving PRs between batches in order to keep the
batches *mostly* target-bound.

Some of the side-effects in managing the coherence of the targeting
and moving PRs between batches is... not great. This might need to be
revisited and cleaned up with those scenarios better considered.
This commit is contained in:
Xavier Morel 2024-02-05 16:10:15 +01:00
parent 9ddf017768
commit ef6a002ea7
6 changed files with 118 additions and 93 deletions

View File

@ -176,7 +176,7 @@ def handle_pr(env, event):
return env['runbot_merge.pull_requests'].search([
('repository', '=', repo.id),
('number', '=', pr['number']),
('target', '=', target.id),
# ('target', '=', target.id),
])
# edition difficulty: pr['base']['ref] is the *new* target, the old one
# is at event['change']['base']['ref'] (if the target changed), so edition

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from odoo import models, fields
from odoo import models, fields, api
class Batch(models.Model):
@ -36,3 +36,29 @@ class Batch(models.Model):
default=False, tracking=True,
help="Cancels current staging on target branch when becoming ready"
)
blocked = fields.Char(store=True, compute="_compute_stageable")
@api.depends(
"merge_date",
"prs.error", "prs.draft", "prs.squash", "prs.merge_method",
"skipchecks", "prs.status", "prs.reviewed_by",
)
def _compute_stageable(self):
for batch in self:
if batch.merge_date:
batch.blocked = "Merged."
elif blocking := batch.prs.filtered(
lambda p: p.error or p.draft or not (p.squash or p.merge_method)
):
batch.blocked = "Pull request(s) %s blocked." % (
p.display_name for p in blocking
)
elif not batch.skipchecks and (unready := batch.prs.filtered(
lambda p: not (p.reviewed_by and p.status == "success")
)):
batch.blocked = "Pull request(s) %s waiting for CI." % ', '.join(
p.display_name for p in unready
)
else:
batch.blocked = False

View File

@ -218,7 +218,8 @@ class FreezeWizard(models.Model):
}
for repo, copy in repos.items():
copy.fetch(git.source_url(repo, 'github'), '+refs/heads/*:refs/heads/*')
for pr in self.release_pr_ids.pr_id | self.bump_pr_ids.pr_id:
all_prs = self.release_pr_ids.pr_id | self.bump_pr_ids.pr_id
for pr in all_prs:
repos[pr.repository].fetch(
git.source_url(pr.repository, 'github'),
pr.head,
@ -338,7 +339,15 @@ class FreezeWizard(models.Model):
f"Unable to {reason} branch {repo}:{branch}.{addendum}"
)
all_prs = self.release_pr_ids.pr_id | self.bump_pr_ids.pr_id
b = self.env['runbot_merge.branch'].search([('name', '=', self.branch_name)])
self.env.cr.execute(
"UPDATE runbot_merge_batch SET target=%s WHERE id = %s;"
"UPDATE runbot_merge_pull_requests SET target=%s WHERE id = any(%s)",
[
b.id, self.release_pr_ids.pr_id.batch_id.id,
b.id, self.release_pr_ids.pr_id.ids,
]
)
all_prs.batch_id.merge_date = fields.Datetime.now()
all_prs.reviewed_by = self.env.user.partner_id.id
self.env['runbot_merge.pull_requests.feedback'].create([{

View File

@ -1094,15 +1094,12 @@ class PullRequests(models.Model):
if canceler:
fields.append('state')
fields.append('skipchecks')
if 'target' in vals:
fields.append('target')
if 'message' in vals:
fields.append('message')
prev = {pr.id: {field: pr[field] for field in fields} for pr in self}
# when explicitly marking a PR as ready
if vals.get('state') == 'ready':
# skip checks anyway
vals['skipchecks'] = True
self.batch_id.skipchecks = True
# if the state is forced to ready, set current user as reviewer
# and override all statuses
vals.setdefault('reviewed_by', self.env.user.partner_id.id)
@ -1117,6 +1114,20 @@ class PullRequests(models.Model):
('prs', '=', True),
])
}))
for pr in self:
if (t := vals.get('target')) is not None and pr.target.id != t:
pr.unstage(
"target (base) branch was changed from %r to %r",
pr.target.display_name,
self.env['runbot_merge.branch'].browse(t).display_name,
)
if 'message' in vals:
merge_method = vals['merge_method'] if 'merge_method' in vals else pr.merge_method
if merge_method not in (False, 'rebase-ff') and pr.message != vals['message']:
pr.unstage("merge message updated")
match vals.get('closed'):
case True if not self.closed:
vals['reviewed_by'] = False
@ -1128,6 +1139,13 @@ class PullRequests(models.Model):
target=vals.get('target') or self.target.id,
label=vals.get('label') or self.label,
)
case None if (new_target := vals.get('target')) and new_target != self.target.id:
vals['batch_id'] = self._get_batch(
target=new_target,
label=vals.get('label') or self.label,
)
if len(self.batch_id.prs) == 1:
self.env.cr.precommit.add(self.batch_id.unlink)
w = super().write(vals)
newhead = vals.get('head')
@ -1135,29 +1153,20 @@ class PullRequests(models.Model):
c = self.env['runbot_merge.commit'].search([('sha', '=', newhead)])
self._validate(c.statuses or '{}')
for pr in self:
old = prev[pr.id]
if canceler:
if canceler:
for pr in self:
if not pr.cancel_staging:
continue
old = prev[pr.id]
def ready(pr):
return pr['state'] == 'ready'\
or (pr['skipchecks'] and pr['state'] != 'error')
if pr.cancel_staging and not ready(old) and ready(pr):
if old['state'] == 'error': # error -> ready gets a bespok message
if ready(pr) and not ready(old):
if old['state'] == 'error': # error -> ready gets a bespoke message
pr.target.active_staging_id.cancel(f"retrying {pr.display_name}")
else:
pr.target.active_staging_id.cancel(f"{pr.display_name} became ready")
if 'target' in vals:
old_target = old['target']
if pr.target != old_target:
pr.unstage(
"target (base) branch was changed from %r to %r",
old_target.display_name, pr.target.display_name,
)
if 'message' in vals:
if pr.merge_method not in (False, 'rebase-ff') and pr.message != old['message']:
pr.unstage("merge message updated")
return w
def _check_linked_prs_statuses(self, commit=False):

View File

@ -6,6 +6,7 @@ import json
import logging
import os
import re
from collections.abc import Mapping
from difflib import Differ
from itertools import takewhile
from operator import itemgetter
@ -14,7 +15,7 @@ from typing import Dict, Union, Optional, Literal, Callable, Iterator, Tuple, Li
from werkzeug.datastructures import Headers
from odoo import api, models, fields
from odoo.tools import OrderedSet
from odoo.tools import OrderedSet, groupby
from .pull_requests import Branch, Stagings, PullRequests, Repository
from .batch import Batch
from .. import exceptions, utils, github, git
@ -57,76 +58,63 @@ def try_staging(branch: Branch) -> Optional[Stagings]:
if branch.active_staging_id:
return None
rows = [
(p, prs)
for p, prs in ready_prs(for_branch=branch)
if not any(prs.mapped('blocked'))
]
def log(label, batches):
_logger.info(label, ', '.join(
p.display_name
for batch in batches
for p in batch
))
def log(label: str, batches: Batch) -> None:
_logger.info(label, ', '.join(batches.mapped('prs.display_name')))
rows = ready_prs(for_branch=branch)
priority = rows[0][0] if rows else None
concat = branch.env['runbot_merge.batch'].concat
if priority == 'alone':
batched_prs = [pr_ids for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)]
log("staging high-priority PRs %s", batched_prs)
batches: Batch = concat(*(batch for _, batch in takewhile(lambda r: r[0] == priority, rows)))
log("staging high-priority PRs %s", batches)
elif branch.project_id.staging_priority == 'default':
if branch.split_ids:
split_ids = branch.split_ids[0]
batched_prs = [batch.prs for batch in split_ids.batch_ids]
split_ids.unlink()
log("staging split PRs %s (prioritising splits)", batched_prs)
if split := branch.split_ids[:1]:
batches = split.batch_ids
split.unlink()
log("staging split PRs %s (prioritising splits)", batches)
else:
# priority, normal; priority = sorted ahead of normal, so always picked
# first as long as there's room
batched_prs = [pr_ids for _, pr_ids in rows]
log("staging ready PRs %s (prioritising splits)", batched_prs)
batches = concat(*(batch for _, batch in rows))
log("staging ready PRs %s (prioritising splits)", batches)
elif branch.project_id.staging_priority == 'ready':
# splits are ready by definition, we need to exclude them from the
# ready rows otherwise we'll re-stage the splits so if an error is legit
# we cycle forever
# FIXME: once the batches are less shit, store this durably on the
# batches and filter out when fetching readies (?)
split_batches = {batch.prs for batch in branch.split_ids.batch_ids}
ready = [pr_ids for _, pr_ids in rows if pr_ids not in split_batches]
batches = concat(*(batch for _, batch in rows)) - branch.split_ids.batch_ids
if ready:
batched_prs = ready
log("staging ready PRs %s (prioritising ready)", batched_prs)
if batches:
log("staging ready PRs %s (prioritising ready)", batches)
else:
split_ids = branch.split_ids[:1]
batched_prs = [batch.prs for batch in split_ids.batch_ids]
split_ids.unlink()
log("staging split PRs %s (prioritising ready)", batched_prs)
split = branch.split_ids[:1]
batches = split.batch_ids
split.unlink()
log("staging split PRs %s (prioritising ready)", batches)
else:
assert branch.project_id.staging_priority == 'largest'
# splits are ready by definition, we need to exclude them from the
# ready rows otherwise ready always wins but we re-stage the splits, so
# if an error is legit we'll cycle forever
split_batches = {batch.prs for batch in branch.split_ids.batch_ids}
ready = [pr_ids for _, pr_ids in rows if pr_ids not in split_batches]
batches = concat(*(batch for _, batch in rows)) - branch.split_ids.batch_ids
maxsplit = max(branch.split_ids, key=lambda s: len(s.batch_ids), default=branch.env['runbot_merge.split'])
_logger.info("largest split = %d, ready = %d", len(maxsplit.batch_ids), len(ready))
_logger.info("largest split = %d, ready = %d", len(maxsplit.batch_ids), len(batches))
# bias towards splits if len(ready) = len(batch_ids)
if len(maxsplit.batch_ids) >= len(ready):
batched_prs = [batch.prs for batch in maxsplit.batch_ids]
if len(maxsplit.batch_ids) >= len(batches):
batches = maxsplit.batch_ids
maxsplit.unlink()
log("staging split PRs %s (prioritising largest)", batched_prs)
log("staging split PRs %s (prioritising largest)", batches)
else:
batched_prs = ready
log("staging ready PRs %s (prioritising largest)", batched_prs)
log("staging ready PRs %s (prioritising largest)", batches)
if not batched_prs:
if not batches:
return
original_heads, staging_state = staging_setup(branch, batched_prs)
original_heads, staging_state = staging_setup(branch, batches)
staged = stage_batches(branch, batched_prs, staging_state)
staged = stage_batches(branch, batches, staging_state)
if not staged:
return None
@ -216,32 +204,25 @@ For-Commit-Id: {it.head}
return st
def ready_prs(for_branch: Branch) -> List[Tuple[int, PullRequests]]:
def ready_prs(for_branch: Branch) -> List[Tuple[int, Batch]]:
env = for_branch.env
env.cr.execute("""
SELECT
max(pr.priority) as priority,
array_agg(pr.id) AS match
FROM runbot_merge_pull_requests pr
JOIN runbot_merge_batch b ON (b.id = pr.batch_id)
WHERE pr.target = any(%s)
-- exclude terminal states (so there's no issue when
-- deleting branches & reusing labels)
AND pr.state != 'merged'
AND pr.state != 'closed'
b.id as batch
FROM runbot_merge_batch b
JOIN runbot_merge_pull_requests pr ON (b.id = pr.batch_id)
WHERE b.target = %s AND b.blocked IS NULL
GROUP BY b.id
HAVING
bool_and(pr.state = 'ready')
OR (bool_or(b.skipchecks) AND bool_and(pr.state != 'error'))
ORDER BY max(pr.priority) DESC, min(pr.id)
""", [for_branch.ids])
browse = env['runbot_merge.pull_requests'].browse
return [(p, browse(ids)) for p, ids in env.cr.fetchall()]
ORDER BY max(pr.priority) DESC, min(b.id)
""", [for_branch.id])
browse = env['runbot_merge.batch'].browse
return [(p, browse(id)) for p, id in env.cr.fetchall()]
def staging_setup(
target: Branch,
batched_prs: List[PullRequests],
batches: Batch,
) -> Tuple[Dict[Repository, str], StagingState]:
"""Sets up the staging:
@ -249,7 +230,9 @@ def staging_setup(
- creates tmp branch via gh API (to remove)
- generates working copy for each repository with the target branch
"""
all_prs: PullRequests = target.env['runbot_merge.pull_requests'].concat(*batched_prs)
by_repo: Mapping[Repository, List[PullRequests]] = \
dict(groupby(batches.prs, lambda p: p.repository))
staging_state = {}
original_heads = {}
for repo in target.project_id.repo_ids.having_branch(target):
@ -265,7 +248,7 @@ def staging_setup(
# be hooked only to "proper" remote-tracking branches
# (in `refs/remotes`), it doesn't seem to work here
f'+refs/heads/{target.name}:refs/heads/{target.name}',
*(pr.head for pr in all_prs if pr.repository == repo)
*(pr.head for pr in by_repo.get(repo, []))
)
original_heads[repo] = head
staging_state[repo] = StagingSlice(gh=gh, head=head, repo=source.stdout().with_config(text=True, check=False))
@ -273,18 +256,15 @@ def staging_setup(
return original_heads, staging_state
def stage_batches(branch: Branch, batched_prs: List[PullRequests], staging_state: StagingState) -> Stagings:
def stage_batches(branch: Branch, batches: Batch, staging_state: StagingState) -> Stagings:
batch_limit = branch.project_id.batch_limit
env = branch.env
staged = env['runbot_merge.batch']
for batch in batched_prs:
for batch in batches:
if len(staged) >= batch_limit:
break
assert len(bs := {p.batch_id for p in batch}) == 1,\
f"expected all PRs to have the same batch, got {bs}"
try:
staged |= stage_batch(env, batch[0].batch_id, staging_state)
staged |= stage_batch(env, batch, staging_state)
except exceptions.MergeError as e:
pr = e.args[0]
_logger.info("Failed to stage %s into %s", pr.display_name, branch.name, exc_info=True)

View File

@ -773,6 +773,7 @@ class TestPREdition:
with repo: prx.base = '1.0'
assert pr.target == branch_1
assert not pr.staging_id, "updated the base of a staged PR should have unstaged it"
assert st.state == 'cancelled', f"expected cancellation, got {st.state}"
assert st.reason == f"{pr.display_name} target (base) branch was changed from 'master' to '1.0'"
with repo: prx.base = '2.0'