[CHG] *: persistent batches

This probably has latent bugs, and is only the start of the road to v2
(#789): PR batches are now created up-front (alongside the PR), with
PRs attached and detached as needed, hopefully such that things are
not broken (tests pass but...), this required a fair number of
ajustments to code not taking batches into account, or creating
batches on the fly.

`PullRequests.blocked` has also been updated to rely on the batch to
get its batch-mates, such that it can now be a stored field with the
right dependencies.

The next step is to better leverage this change:

- move cross-PR state up to the batch (e.g. skipchecks, priority, ...)
- add fw info to the batch, perform forward-ports batchwise in order
  to avoid redundant batch-selection work, and allow altering batches
  during fw (e.g. adding or removing PRs)
- use batches to select stagings
- maybe expose staging history of a batch?
This commit is contained in:
Xavier Morel 2023-12-19 11:10:11 +01:00
parent c140701975
commit 473f89f87d
10 changed files with 147 additions and 143 deletions

View File

@ -84,32 +84,32 @@ class ForwardPortTasks(models.Model, Queue):
sentry_sdk.set_tag('forward-porting', batch.prs.mapped('display_name'))
newbatch = batch.prs._port_forward()
if newbatch:
_logger.info(
"Processing %s (from %s): %s (%s) -> %s (%s)",
self.id, self.source,
batch, batch.prs,
newbatch, newbatch.prs,
)
# insert new batch in ancestry sequence unless conflict (= no parent)
if self.source == 'insert':
for pr in newbatch.prs:
if not pr.parent_id:
break
newchild = pr.search([
('parent_id', '=', pr.parent_id.id),
('id', '!=', pr.id),
])
if newchild:
newchild.parent_id = pr.id
else: # reached end of seq (or batch is empty)
if not newbatch: # reached end of seq (or batch is empty)
# FIXME: or configuration is fucky so doesn't want to FP (maybe should error and retry?)
_logger.info(
"Processing %s (from %s): %s (%s) -> end of the sequence",
"Processed %s (from %s): %s (%s) -> end of the sequence",
self.id, self.source,
batch, batch.prs
)
batch.active = False
return
_logger.info(
"Processed %s (from %s): %s (%s) -> %s (%s)",
self.id, self.source,
batch, batch.prs,
newbatch, newbatch.prs,
)
# insert new batch in ancestry sequence unless conflict (= no parent)
if self.source == 'insert':
for pr in newbatch.prs:
if not pr.parent_id:
break
newchild = pr.search([
('parent_id', '=', pr.parent_id.id),
('id', '!=', pr.id),
])
if newchild:
newchild.parent_id = pr.id
class UpdateQueue(models.Model, Queue):

View File

@ -157,11 +157,7 @@ class Project(models.Model):
# the parents linked list, so it has a special type
for _, cs in groupby(candidates, key=lambda p: p.label):
self.env['forwardport.batches'].create({
'batch_id': self.env['runbot_merge.batch'].create({
'target': before[-1].id,
'prs': [(4, c.id, 0) for c in cs],
'active': False,
}).id,
'batch_id': cs[0].batch_id.id,
'source': 'insert',
})
@ -321,7 +317,7 @@ class PullRequests(models.Model):
addendum = ''
# check if tip was queued for forward porting, try to cancel if we're
# supposed to stop here
if real_limit == tip.target and (task := self.env['forwardport.batches'].search([('batch_id', 'in', tip.batch_ids.ids)])):
if real_limit == tip.target and (task := self.env['forwardport.batches'].search([('batch_id', '=', tip.batch_id.id)])):
try:
with self.env.cr.savepoint():
self.env.cr.execute(
@ -343,13 +339,13 @@ class PullRequests(models.Model):
# resume
if tip.state == 'merged':
self.env['forwardport.batches'].create({
'batch_id': tip.batch_ids.sorted('id')[-1].id,
'batch_id': tip.batch_id.id,
'source': 'fp' if tip.parent_id else 'merge',
})
resumed = tip
else:
# reactivate batch
tip.batch_ids.sorted('id')[-1].active = True
tip.batch_id.active = True
resumed = tip._schedule_fp_followup()
if resumed:
addendum += f', resuming forward-port stopped at {tip.display_name}'
@ -411,18 +407,21 @@ class PullRequests(models.Model):
_logger.info('-> wrong state %s (%s)', pr.display_name, pr.state)
continue
# check if we've already forward-ported this branch:
# it has a batch without a staging
batch = self.env['runbot_merge.batch'].with_context(active_test=False).search([
('staging_id', '=', False),
('prs', 'in', pr.id),
], limit=1)
# if the batch is inactive, the forward-port has been done *or*
# the PR's own forward port is in error, so bail
if not batch.active:
_logger.info('-> forward port done or in error (%s.active=%s)', batch, batch.active)
# check if we've already forward-ported this branch
source = pr.source_id or pr
next_target = source._find_next_target(pr)
if not next_target:
_logger.info("-> forward port done (no next target)")
continue
if n := self.search([
('source_id', '=', source.id),
('target', '=', next_target.id),
], limit=1):
_logger.info('-> already forward-ported (%s)', n.display_name)
continue
batch = pr.batch_id
# otherwise check if we already have a pending forward port
_logger.info("%s %s %s", pr.display_name, batch, ', '.join(batch.mapped('prs.display_name')))
if self.env['forwardport.batches'].search_count([('batch_id', '=', batch.id)]):
@ -710,18 +709,9 @@ class PullRequests(models.Model):
'tags_add': labels,
})
# batch the PRs so _validate can perform the followup FP properly
# (with the entire batch). If there are conflict then create a
# deactivated batch so the interface is coherent but we don't pickup
# an active batch we're never going to deactivate.
b = self.env['runbot_merge.batch'].create({
'target': target.id,
'prs': [(6, 0, new_batch.ids)],
'active': not has_conflicts,
})
# try to schedule followup
new_batch[0]._schedule_fp_followup()
return b
return new_batch.batch_id
def _create_fp_branch(self, target_branch, fp_branch_name, cleanup):
""" Creates a forward-port for the current PR to ``target_branch`` under

View File

@ -367,6 +367,7 @@ def setci(*, source, repo, target, status='success'):
in ``repo``.
"""
pr = source.search([('source_id', '=', source.id), ('target.name', '=', str(target))])
assert pr, f"could not find forward port of {source.display_name} to {target}"
with repo:
repo.post_status(pr.head, status)

View File

@ -191,19 +191,8 @@ def test_failed_staging(env, config, make_repo):
prod.post_status('staging.c', 'failure', 'ci/runbot')
env.run_crons()
pr3_head = env['runbot_merge.commit'].search([
('sha', '=', pr3_id.head),
])
assert len(pr3_head) == 1
assert not pr3_id.batch_id, "check that the PR indeed has no batch anymore"
assert not pr3_id.batch_ids.filtered(lambda b: b.active)
assert len(env['runbot_merge.batch'].search([
('prs', 'in', pr3_id.id),
'|', ('active', '=', True),
('active', '=', False),
])) == 2, "check that there do exist batches"
pr3_head = env['runbot_merge.commit'].search([('sha', '=', pr3_id.head)])
assert pr3_head
# send a new status to the PR, as if somebody had rebuilt it or something
with prod:
@ -213,6 +202,8 @@ def test_failed_staging(env, config, make_repo):
assert pr3_head.to_check, "check that the commit was updated as to process"
env.run_crons()
assert not pr3_head.to_check, "check that the commit was processed"
assert pr3_id.state == 'ready'
assert pr3_id.staging_id
class TestNotAllBranches:
""" Check that forward-ports don't behave completely insanely when not all
@ -788,6 +779,9 @@ def test_retarget_after_freeze(env, config, make_repo, users):
prod.post_status('staging.bprime', 'success', 'legal/cla')
env.run_crons()
# #2 batch 6 (???)
assert port_id.state == 'merged'
new_pr_id = env['runbot_merge.pull_requests'].search([('state', 'not in', ('merged', 'closed'))])
assert len(new_pr_id) == 1
assert new_pr_id.parent_id == port_id

View File

@ -2,6 +2,7 @@ from . import ir_actions
from . import res_partner
from . import project
from . import pull_requests
from . import batch
from . import project_freeze
from . import stagings_create
from . import staging_cancel

View File

@ -0,0 +1,21 @@
from __future__ import annotations
from odoo import models, fields
class Batch(models.Model):
""" A batch is a "horizontal" grouping of *codependent* PRs: PRs with
the same label & target but for different repositories. These are
assumed to be part of the same "change" smeared over multiple
repositories e.g. change an API in repo1, this breaks use of that API
in repo2 which now needs to be updated.
"""
_name = _description = 'runbot_merge.batch'
target = fields.Many2one('runbot_merge.branch', required=True, index=True)
staging_ids = fields.Many2many('runbot_merge.stagings')
split_id = fields.Many2one('runbot_merge.split', index=True)
prs = fields.One2many('runbot_merge.pull_requests', 'batch_id')
active = fields.Boolean(default=True)

View File

@ -400,9 +400,14 @@ class PullRequests(models.Model):
], compute='_compute_statuses', store=True, column_type=enum(_name, 'status'))
previous_failure = fields.Char(default='{}')
batch_id = fields.Many2one('runbot_merge.batch', string="Active Batch", compute='_compute_active_batch', store=True)
batch_ids = fields.Many2many('runbot_merge.batch', string="Batches", context={'active_test': False})
staging_id = fields.Many2one(related='batch_id.staging_id', store=True)
batch_id = fields.Many2one('runbot_merge.batch', index=True)
staging_id = fields.Many2one('runbot_merge.stagings', compute='_compute_staging', store=True)
@api.depends('batch_id.staging_ids.active')
def _compute_staging(self):
for p in self:
p.staging_id = p.batch_id.staging_ids.filtered('active')
commits_map = fields.Char(help="JSON-encoded mapping of PR commits to actually integrated commits. The integration head (either a merge commit or the PR's topmost) is mapped from the 'empty' pr commit (the key is an empty string, because you can't put a null key in json maps).", default='{}')
link_warned = fields.Boolean(
@ -411,7 +416,7 @@ class PullRequests(models.Model):
)
blocked = fields.Char(
compute='_compute_is_blocked',
compute='_compute_is_blocked', store=True,
help="PR is not currently stageable for some reason (mostly an issue if status is ready)"
)
@ -520,28 +525,23 @@ class PullRequests(models.Model):
@property
def _linked_prs(self):
if re.search(r':patch-\d+', self.label):
return self.browse(())
if self.state == 'merged':
return self.with_context(active_test=False).batch_ids\
.filtered(lambda b: b.staging_id.state == 'success')\
.prs - self
return self.search([
('target', '=', self.target.id),
('label', '=', self.label),
('state', 'not in', ('merged', 'closed')),
]) - self
return self.batch_id.prs - self
# missing link to other PRs
@api.depends('state')
@api.depends(
'batch_id.prs.draft',
'batch_id.prs.squash',
'batch_id.prs.merge_method',
'batch_id.prs.state',
'batch_id.prs.skipchecks',
)
def _compute_is_blocked(self):
self.blocked = False
requirements = (
lambda p: not p.draft,
lambda p: p.squash or p.merge_method,
lambda p: p.state == 'ready' \
or any(pr.skipchecks for pr in (p | p._linked_prs)) \
and all(pr.state != 'error' for pr in (p | p._linked_prs))
or any(p.batch_id.prs.mapped('skipchecks')) \
and all(pr.state != 'error' for pr in p.batch_id.prs)
)
messages = ('is in draft', 'has no merge method', 'is not ready')
for pr in self:
@ -550,7 +550,7 @@ class PullRequests(models.Model):
blocking, message = next((
(blocking, message)
for blocking in (pr | pr._linked_prs)
for blocking in pr.batch_id.prs
for requirement, message in zip(requirements, messages)
if not requirement(blocking)
), (None, None))
@ -712,7 +712,7 @@ class PullRequests(models.Model):
else:
msg = self._approve(author, login)
case commands.Reject() if is_author:
batch = self | self._linked_prs
batch = self.batch_id.prs
if cancellers := batch.filtered('cancel_staging'):
cancellers.cancel_staging = False
if (skippers := batch.filtered('skipchecks')) or self.reviewed_by:
@ -1035,8 +1035,22 @@ class PullRequests(models.Model):
return 'staged'
return self.state
def _get_batch(self, *, target, label):
Batches = self.env['runbot_merge.batch']
if re.search(r':patch-\d+$', label):
batch = Batches
else:
batch = Batches.search([
('active', '=', True),
('target', '=', target),
('prs.label', '=', label),
])
return batch or Batches.create({'target': target})
@api.model
def create(self, vals):
vals['batch_id'] = self._get_batch(target=vals['target'], label=vals['label']).id
pr = super().create(vals)
c = self.env['runbot_merge.commit'].search([('sha', '=', pr.head)])
pr._validate(c.statuses or '{}')
@ -1108,8 +1122,17 @@ class PullRequests(models.Model):
('prs', '=', True),
])
}))
if vals.get('closed'):
vals['reviewed_by'] = False
match vals.get('closed'):
case True if not self.closed:
vals['reviewed_by'] = False
vals['batch_id'] = False
if len(self.batch_id.prs) == 1:
self.env.cr.precommit.add(self.batch_id.unlink)
case False if self.closed:
vals['batch_id'] = self._get_batch(
target=vals.get('target') or self.target.id,
label=vals.get('label') or self.label,
)
w = super().write(vals)
newhead = vals.get('head')
@ -1231,16 +1254,13 @@ class PullRequests(models.Model):
""" If the PR is staged, cancel the staging. If the PR is split and
waiting, remove it from the split (possibly delete the split entirely)
"""
split_batches = self.with_context(active_test=False).mapped('batch_ids').filtered('split_id')
if len(split_batches) > 1:
_logger.warning("Found a PR linked with more than one split batch: %s (%s)", self, split_batches)
for b in split_batches:
if len(b.split_id.batch_ids) == 1:
# only the batch of this PR -> delete split
b.split_id.unlink()
else:
# else remove this batch from the split
b.split_id = False
split = self.batch_id.split_id
if len(split.batch_ids) == 1:
# only the batch of this PR -> delete split
split.unlink()
else:
# else remove this batch from the split
self.batch_id.split_id = False
self.staging_id.cancel('%s ' + reason, self.display_name, *args)
@ -1248,21 +1268,29 @@ class PullRequests(models.Model):
# ignore if the PR is already being updated in a separate transaction
# (most likely being merged?)
self.env.cr.execute('''
SELECT id, state FROM runbot_merge_pull_requests
SELECT id, state, batch_id FROM runbot_merge_pull_requests
WHERE id = %s AND state != 'merged'
FOR UPDATE SKIP LOCKED;
''', [self.id])
if not self.env.cr.fetchone():
r = self.env.cr.fetchone()
if not r:
return False
self.env.cr.execute('''
UPDATE runbot_merge_pull_requests
SET closed=True, state = 'closed', reviewed_by = null
SET closed=True, reviewed_by = null
WHERE id = %s
''', [self.id])
self.env.cr.commit()
self.modified(['closed', 'state', 'reviewed_by'])
self.modified(['closed', 'reviewed_by', 'batch_id'])
self.unstage("closed by %s", by)
# PRs should leave their batch when *closed*, and batches must die when
# no PR is linked
old_batch = self.batch_id
self.batch_id = False
if not old_batch.prs:
old_batch.unlink()
return True
# state changes on reviews
@ -1571,10 +1599,7 @@ class Stagings(models.Model):
target = fields.Many2one('runbot_merge.branch', required=True, index=True)
batch_ids = fields.One2many(
'runbot_merge.batch', 'staging_id',
context={'active_test': False},
)
batch_ids = fields.Many2many('runbot_merge.batch', context={'active_test': False})
pr_ids = fields.One2many('runbot_merge.pull_requests', compute='_compute_prs')
state = fields.Selection([
('success', 'Success'),
@ -1972,34 +1997,6 @@ class Split(models.Model):
target = fields.Many2one('runbot_merge.branch', required=True)
batch_ids = fields.One2many('runbot_merge.batch', 'split_id', context={'active_test': False})
class Batch(models.Model):
""" A batch is a "horizontal" grouping of *codependent* PRs: PRs with
the same label & target but for different repositories. These are
assumed to be part of the same "change" smeared over multiple
repositories e.g. change an API in repo1, this breaks use of that API
in repo2 which now needs to be updated.
"""
_name = _description = 'runbot_merge.batch'
target = fields.Many2one('runbot_merge.branch', required=True, index=True)
staging_id = fields.Many2one('runbot_merge.stagings', index=True)
split_id = fields.Many2one('runbot_merge.split', index=True)
prs = fields.Many2many('runbot_merge.pull_requests')
active = fields.Boolean(default=True)
@api.constrains('target', 'prs')
def _check_prs(self):
for batch in self:
repos = self.env['runbot_merge.repository']
for pr in batch.prs:
if pr.target != batch.target:
raise ValidationError("A batch and its PRs must have the same branch, got %s and %s" % (batch.target, pr.target))
if pr.repository in repos:
raise ValidationError("All prs of a batch must have different target repositories, got a duplicate %s on %s" % (pr.repository, pr))
repos |= pr.repository
class FetchJob(models.Model):
_name = _description = 'runbot_merge.fetch_job'

View File

@ -15,7 +15,8 @@ from werkzeug.datastructures import Headers
from odoo import api, models, fields
from odoo.tools import OrderedSet
from .pull_requests import Branch, Stagings, PullRequests, Repository, Batch
from .pull_requests import Branch, Stagings, PullRequests, Repository
from .batch import Batch
from .. import exceptions, utils, github, git
WAIT_FOR_VISIBILITY = [10, 10, 10, 10]
@ -285,8 +286,10 @@ def stage_batches(branch: Branch, batched_prs: List[PullRequests], staging_state
if len(staged) >= batch_limit:
break
assert len(bs := {p.batch_id for p in batch}) == 1,\
f"expected all PRs to have the same batch, got {bs}"
try:
staged |= stage_batch(env, batch, staging_state)
staged |= stage_batch(env, batch[0].batch_id, staging_state)
except exceptions.MergeError as e:
pr = e.args[0]
_logger.info("Failed to stage %s into %s", pr.display_name, branch.name, exc_info=True)
@ -335,16 +338,18 @@ def parse_refs_smart(read: Callable[[int], bytes]) -> Iterator[Tuple[str, str]]:
UNCHECKABLE = ['merge_method', 'overrides', 'draft']
def stage_batch(env: api.Environment, prs: PullRequests, staging: StagingState) -> Batch:
def stage_batch(env: api.Environment, batch: Batch, staging: StagingState):
"""Stages the batch represented by the ``prs`` recordset, onto the
current corresponding staging heads.
Alongside returning the newly created batch, updates ``staging[*].head``
in-place on success. On failure, the heads should not be touched.
May return an empty recordset on some non-fatal failures.
"""
new_heads: Dict[PullRequests, str] = {}
pr_fields = env['runbot_merge.pull_requests']._fields
for pr in prs:
for pr in batch.prs:
info = staging[pr.repository]
_logger.info(
"Staging pr %s for target %s; method=%s",
@ -353,7 +358,7 @@ def stage_batch(env: api.Environment, prs: PullRequests, staging: StagingState)
)
try:
method, new_heads[pr] = stage(pr, info, related_prs=(prs - pr))
method, new_heads[pr] = stage(pr, info, related_prs=(batch.prs - pr))
_logger.info(
"Staged pr %s to %s by %s: %s -> %s",
pr.display_name, pr.target.name, method,
@ -382,10 +387,7 @@ def stage_batch(env: api.Environment, prs: PullRequests, staging: StagingState)
# update meta to new heads
for pr, head in new_heads.items():
staging[pr.repository].head = head
return env['runbot_merge.batch'].create({
'target': prs[0].target.id,
'prs': [(4, pr.id, 0) for pr in prs],
})
return batch
def format_for_difflib(items: Iterator[Tuple[str, object]]) -> Iterator[str]:
""" Bit of a pain in the ass because difflib really wants

View File

@ -838,8 +838,6 @@ class TestBlocked:
p = to_pr(env, pr)
assert p.blocked
with repo_b: b.close()
# FIXME: find a way for PR.blocked to depend on linked PR somehow so this isn't needed
p.invalidate_cache(['blocked'], [p.id])
assert not p.blocked
def test_linked_merged(self, env, repo_a, repo_b, config):

View File

@ -331,7 +331,7 @@
<template id="view_pull_request_info_error">
<div class="alert alert-danger">
Error:
<span t-esc="pr.with_context(active_test=False).batch_ids[-1:].staging_id.reason">
<span t-esc="pr.with_context(active_test=False).batch_id.staging_ids[-1:].reason">
Unable to stage PR
</span>
</div>