[IMP] runbot_merge: split staging heads out to join tables

Currently the heads of a staging (both staging heads and merged heads)
are just JSON data on the staging itself. Historically this was
convenient as the heads were mostly of use to the staging process, and
thus accessed directly through the staging essentially exclusively.

However this makes finding stagings from merged commits e.g. for
forensic research almost impossible, because querying based on
the *values* of a JSON map is expensive, and indexing it is difficult.

To make this use case more feasible, split the `heads` field into two
join tables, one for the staging heads and one for the merged heads,
this makes looking for stagings by commits much more
efficient (although the queries may not be trivial). Also add two
utility RPC methods, so that it's possible to query stagings
reasonably easily and efficiently based on a set of commits (branch
heads).

related to #768
This commit is contained in:
Xavier Morel 2023-08-10 09:02:37 +02:00
parent cdffa83191
commit b1af2e573a
6 changed files with 246 additions and 117 deletions

View File

@ -1,6 +1,6 @@
{
'name': 'merge bot',
'version': '1.7',
'version': '1.8',
'depends': ['contacts', 'website'],
'data': [
'security/security.xml',

View File

@ -0,0 +1,6 @@
from pathlib import Path
def migrate(cr, version):
sql = Path(__file__).parent.joinpath('upgrade.sql')\
.read_text(encoding='utf-8')
cr.execute(sql)

View File

@ -0,0 +1,62 @@
CREATE TABLE runbot_merge_stagings_commits (
id serial NOT NULL,
staging_id integer not null references runbot_merge_stagings (id),
commit_id integer not null references runbot_merge_commit (id),
repository_id integer not null references runbot_merge_repository (id)
);
CREATE TABLE runbot_merge_stagings_heads (
id serial NOT NULL,
staging_id integer NOT NULL REFERENCES runbot_merge_stagings (id),
commit_id integer NOT NULL REFERENCES runbot_merge_commit (id),
repository_id integer NOT NULL REFERENCES runbot_merge_repository (id)
);
-- some of the older stagings only have the head, not the commit,
-- add the commit
UPDATE runbot_merge_stagings
SET heads = heads::jsonb || jsonb_build_object(
'odoo/odoo^', heads::json->'odoo/odoo',
'odoo/enterprise^', heads::json->'odoo/enterprise'
)
WHERE heads NOT ILIKE '%^%';
-- some of the stagings have heads which don't exist in the commits table,
-- because they never got a status from the runbot...
-- create fake commits so we don't lose heads
INSERT INTO runbot_merge_commit (sha, statuses, create_uid, create_date, write_uid, write_date)
SELECT r.value, '{}', s.create_uid, s.create_date, s.create_uid, s.create_date
FROM runbot_merge_stagings s,
json_each_text(s.heads::json) r
ON CONFLICT DO NOTHING;
CREATE TEMPORARY TABLE staging_commits (
id integer NOT NULL,
repo integer NOT NULL,
-- the staging head (may be a dedup, may be the same as commit)
head integer NOT NULL,
-- the staged commit
commit integer NOT NULL
);
-- the splatting works entirely off of the staged head
-- (the one without the ^ suffix), we concat the `^` to get the corresponding
-- merge head (the actual commit to push to the branch)
INSERT INTO staging_commits (id, repo, head, commit)
SELECT s.id, re.id AS repo, h.id AS head, c.id AS commit
FROM runbot_merge_stagings s,
json_each_text(s.heads::json) r,
runbot_merge_commit h,
runbot_merge_commit c,
runbot_merge_repository re
WHERE r.key NOT ILIKE '%^'
AND re.name = r.key
AND h.sha = r.value
AND c.sha = s.heads::json->>(r.key || '^');
INSERT INTO runbot_merge_stagings_heads (staging_id, repository_id, commit_id)
SELECT id, repo, head FROM staging_commits;
INSERT INTO runbot_merge_stagings_commits (staging_id, repository_id, commit_id)
SELECT id, repo, commit FROM staging_commits;
ALTER TABLE runbot_merge_stagings DROP COLUMN heads;

View File

@ -408,18 +408,19 @@ class Branch(models.Model):
if not staged:
return
heads = {}
heads = []
heads_map = {}
commits = []
for repo, it in meta.items():
tree = it['gh'].commit(it['head'])['tree']
# ensures staging branches are unique and always
# rebuilt
r = base64.b64encode(os.urandom(12)).decode('ascii')
trailer = ''
if heads:
if heads_map:
trailer = '\n'.join(
'Runbot-dependency: %s:%s' % (repo, h)
for repo, h in heads.items()
if not repo.endswith('^')
for repo, h in heads_map.items()
)
dummy_head = {'sha': it['head']}
if it['head'] == original_heads[repo]:
@ -435,28 +436,50 @@ For-Commit-Id: %s
'parents': [it['head']],
}).json()
# $repo is the head to check, $repo^ is the head to merge (they
# might be the same)
heads[repo.name + '^'] = it['head']
heads[repo.name] = dummy_head['sha']
self.env.cr.execute(
"INSERT INTO runbot_merge_commit (sha, to_check, statuses) "
"VALUES (%s, true, '{}') "
"ON CONFLICT (sha) DO UPDATE SET to_check=true",
[dummy_head['sha']]
)
# special case if the two commits are identical because otherwise
# postgres raises error "ensure that no rows proposed for insertion
# within the same command have duplicate constained values"
if it['head'] == dummy_head['sha']:
self.env.cr.execute(
"INSERT INTO runbot_merge_commit (sha, to_check, statuses) "
"VALUES (%s, true, '{}') "
"ON CONFLICT (sha) DO UPDATE SET to_check=true "
"RETURNING id",
[it['head']]
)
[commit] = [head] = self.env.cr.fetchone()
else:
self.env.cr.execute(
"INSERT INTO runbot_merge_commit (sha, to_check, statuses) "
"VALUES (%s, false, '{}'), (%s, true, '{}') "
"ON CONFLICT (sha) DO UPDATE SET to_check=true "
"RETURNING id",
[it['head'], dummy_head['sha']]
)
([commit], [head]) = self.env.cr.fetchall()
heads_map[repo.name] = dummy_head['sha']
heads.append(fields.Command.create({
'repository_id': repo.id,
'commit_id': head,
}))
commits.append(fields.Command.create({
'repository_id': repo.id,
'commit_id': commit,
}))
# create actual staging object
st = self.env['runbot_merge.stagings'].create({
'target': self.id,
'batch_ids': [(4, batch.id, 0) for batch in staged],
'heads': json.dumps(heads)
'heads': heads,
'commits': commits,
})
# create staging branch from tmp
token = self.project_id.github_token
for r in self.project_id.repo_ids.having_branch(self):
it = meta[r]
staging_head = heads[r.name]
staging_head = heads_map[r.name]
_logger.info(
"%s: create staging for %s:%s at %s",
self.project_id.name, r.name, self.name,
@ -1716,13 +1739,7 @@ class Commit(models.Model):
if pr:
pr._validate(st)
stagings = Stagings.search([('heads', 'ilike', c.sha)]).filtered(
lambda s, h=c.sha: any(
head == h
for repo, head in json.loads(s.heads).items()
if not repo.endswith('^')
)
)
stagings = Stagings.search([('head_ids.sha', '=', c.sha)])
if stagings:
stagings._validate()
except Exception:
@ -1748,6 +1765,53 @@ class Commit(models.Model):
""")
return res
class StagingCommits(models.Model):
_name = 'runbot_merge.stagings.commits'
_description = "Mergeable commits for stagings, always the actually merged " \
"commit, never a uniquifier"
_log_access = False
staging_id = fields.Many2one('runbot_merge.stagings', required=True)
commit_id = fields.Many2one('runbot_merge.commit', index=True, required=True)
repository_id = fields.Many2one('runbot_merge.repository', required=True)
def _auto_init(self):
super()._auto_init()
# the same commit can be both head and tip (?)
tools.create_unique_index(
self.env.cr, self._table + "_unique",
self._table, ['staging_id', 'commit_id']
)
# there should be one head per staging per repository, unless one is a
# real head and one is a uniquifier head
tools.create_unique_index(
self.env.cr, self._table + "_unique_per_repo",
self._table, ['staging_id', 'repository_id'],
)
class StagingHeads(models.Model):
_name = 'runbot_merge.stagings.heads'
_description = "Staging heads, may be the staging's commit or may be a " \
"uniquifier (discarded on success)"
_log_access = False
staging_id = fields.Many2one('runbot_merge.stagings', required=True)
commit_id = fields.Many2one('runbot_merge.commit', index=True, required=True)
repository_id = fields.Many2one('runbot_merge.repository', required=True)
def _auto_init(self):
super()._auto_init()
# the same commit can be both head and tip (?)
tools.create_unique_index(
self.env.cr, self._table + "_unique",
self._table, ['staging_id', 'commit_id']
)
# there should be one head per staging per repository, unless one is a
# real head and one is a uniquifier head
tools.create_unique_index(
self.env.cr, self._table + "_unique_per_repo",
self._table, ['staging_id', 'repository_id'],
)
class Stagings(models.Model):
_name = _description = 'runbot_merge.stagings'
@ -1771,9 +1835,10 @@ class Stagings(models.Model):
timeout_limit = fields.Datetime(store=True, compute='_compute_timeout_limit')
reason = fields.Text("Reason for final state (if any)")
# seems simpler than adding yet another indirection through a model
heads = fields.Char(required=True, help="JSON-encoded map of heads, one per repo in the project")
head_ids = fields.Many2many('runbot_merge.commit', compute='_compute_statuses')
head_ids = fields.Many2many('runbot_merge.commit', relation='runbot_merge_stagings_heads', column1='staging_id', column2='commit_id')
heads = fields.One2many('runbot_merge.stagings.heads', 'staging_id')
commit_ids = fields.Many2many('runbot_merge.commit', relation='runbot_merge_stagings_commits', column1='staging_id', column2='commit_id')
commits = fields.One2many('runbot_merge.stagings.commits', 'staging_id')
statuses = fields.Binary(compute='_compute_statuses')
statuses_cache = fields.Text()
@ -1786,7 +1851,7 @@ class Stagings(models.Model):
return super().write(vals)
previously_pending = self.filtered(lambda s: s.state == 'pending')
super(Stagings, self).write(vals)
super().write(vals)
for staging in previously_pending:
if staging.state != 'pending':
super(Stagings, staging).write({
@ -1795,7 +1860,6 @@ class Stagings(models.Model):
return True
def name_get(self):
return [
(staging.id, "%d (%s, %s%s)" % (
@ -1812,33 +1876,18 @@ class Stagings(models.Model):
""" Fetches statuses associated with the various heads, returned as
(repo, context, state, url)
"""
Commits = self.env['runbot_merge.commit']
heads = {
head: repo
for st in self
for repo, head in json.loads(st.heads).items()
if not repo.endswith('^')
}
all_heads = Commits.search([('sha', 'in', list(heads))])
commits_map = {commit.sha: commit.id for commit in all_heads}
heads = {h.commit_id: h.repository_id for h in self.mapped('heads')}
all_heads = self.mapped('head_ids')
for st in self:
commits = st.head_ids = Commits._browse(
self.env,
tuple(
commits_map[h]
for repo, h in json.loads(st.heads).items()
if not repo.endswith('^')
),
all_heads._prefetch_ids
)
if st.statuses_cache:
st.statuses = json.loads(st.statuses_cache)
continue
commits = st.head_ids.with_prefetch(all_heads._prefetch_ids)
st.statuses = [
(
heads[commit.sha],
heads[commit].name,
context,
status.get('state') or 'pending',
status.get('target_url') or ''
@ -1865,27 +1914,17 @@ class Stagings(models.Model):
staging.pr_ids = staging.batch_ids.prs
def _validate(self):
Commits = self.env['runbot_merge.commit']
for s in self:
if s.state != 'pending':
continue
repos = {
repo.name: repo
for repo in self.env['runbot_merge.repository'].search([])
.having_branch(s.target)
}
# maps commits to the statuses they need
required_statuses = [
(head, repos[repo].status_ids._for_staging(s).mapped('context'))
for repo, head in json.loads(s.heads).items()
if not repo.endswith('^')
(h.commit_id.sha, h.repository_id.status_ids._for_staging(s).mapped('context'))
for h in s.heads
]
# maps commits to their statuses
cmap = {
c.sha: json.loads(c.statuses)
for c in Commits.search([('sha', 'in', [h for h, _ in required_statuses])])
}
cmap = {c.sha: json.loads(c.statuses) for c in s.head_ids}
update_timeout_limit = False
st = 'success'
@ -1983,19 +2022,10 @@ class Stagings(models.Model):
return False
# try inferring which PR failed and only mark that one
for repo, head in json.loads(self.heads).items():
if repo.endswith('^'):
continue
for head in self.heads:
required_statuses = set(head.repository_id.status_ids._for_staging(self).mapped('context'))
required_statuses = set(
self.env['runbot_merge.repository']
.search([('name', '=', repo)])
.status_ids
._for_staging(self)
.mapped('context'))
commit = self.env['runbot_merge.commit'].search([('sha', '=', head)])
statuses = json.loads(commit.statuses or '{}')
statuses = json.loads(head.commit_id.statuses or '{}')
reason = next((
ctx for ctx, result in statuses.items()
if ctx in required_statuses
@ -2004,10 +2034,7 @@ class Stagings(models.Model):
if not reason:
continue
pr = next((
pr for pr in self.batch_ids.prs
if pr.repository.name == repo
), None)
pr = next((pr for pr in self.batch_ids.prs if pr.repository == head.repository_id), None)
status = to_status(statuses[reason])
viewmore = ''
@ -2016,7 +2043,7 @@ class Stagings(models.Model):
if pr:
self.fail("%s%s" % (reason, viewmore), pr)
else:
self.fail('%s on %s%s' % (reason, head, viewmore))
self.fail('%s on %s%s' % (reason, head.commit_id.sha, viewmore))
return False
# the staging failed but we don't have a specific culprit, fail
@ -2043,7 +2070,6 @@ class Stagings(models.Model):
project = self.target.project_id
if self.state == 'success':
gh = {repo.name: repo.github() for repo in project.repo_ids.having_branch(self.target)}
staging_heads = json.loads(self.heads)
self.env.cr.execute('''
SELECT 1 FROM runbot_merge_pull_requests
WHERE id in %s
@ -2053,7 +2079,7 @@ class Stagings(models.Model):
with sentry_sdk.start_span(description="merge staging") as span:
span.set_tag("staging", self.id)
span.set_tag("branch", self.target.name)
self._safety_dance(gh, staging_heads)
self._safety_dance(gh, self.commits)
except exceptions.FastForwardError as e:
logger.warning(
"Could not fast-forward successful staging on %s:%s",
@ -2100,7 +2126,7 @@ class Stagings(models.Model):
def is_timed_out(self):
return fields.Datetime.from_string(self.timeout_limit) < datetime.datetime.now()
def _safety_dance(self, gh, staging_heads):
def _safety_dance(self, gh, staging_commits: StagingCommits):
""" Reverting updates doesn't work if the branches are protected
(because a revert is basically a force push). So we can update
REPO_A, then fail to update REPO_B for some reason, and we're hosed.
@ -2117,51 +2143,69 @@ class Stagings(models.Model):
bad. In that case, wait a bit and retry for now. A more complex
strategy (including disabling the branch entirely until somebody
has looked at and fixed the issue) might be necessary.
:returns: the last repo it tried to update (probably the one on which
it failed, if it failed)
"""
# FIXME: would make sense for FFE to be richer, and contain the repo name
repo_name = None
tmp_target = 'tmp.' + self.target.name
# first force-push the current targets to all tmps
for repo_name in staging_heads.keys():
if repo_name.endswith('^'):
continue
for repo_name in staging_commits.mapped('repository_id.name'):
g = gh[repo_name]
g.set_ref(tmp_target, g.head(self.target.name))
# then attempt to FF the tmp to the staging
for repo_name, head in staging_heads.items():
if repo_name.endswith('^'):
continue
gh[repo_name].fast_forward(tmp_target, staging_heads.get(repo_name + '^') or head)
# then attempt to FF the tmp to the staging commits
for c in staging_commits:
gh[c.repository_id.name].fast_forward(tmp_target, c.commit_id.sha)
# there is still a race condition here, but it's way
# lower than "the entire staging duration"...
first = True
for repo_name, head in staging_heads.items():
if repo_name.endswith('^'):
continue
for i, c in enumerate(staging_commits):
for pause in [0.1, 0.3, 0.5, 0.9, 0]: # last one must be 0/falsy of we lose the exception
try:
# if the staging has a $repo^ head, merge that,
# otherwise merge the regular (CI'd) head
gh[repo_name].fast_forward(
gh[c.repository_id.name].fast_forward(
self.target.name,
staging_heads.get(repo_name + '^') or head
c.commit_id.sha
)
except exceptions.FastForwardError:
# The GH API regularly fails us. If the failure does not
# occur on the first repository, retry a few times with a
# little pause.
if not first and pause:
if i and pause:
time.sleep(pause)
continue
raise
else:
break
first = False
return repo_name
@api.returns('runbot_merge.stagings')
def for_heads(self, *heads):
"""Returns the staging(s) with all the specified heads. Heads should
be unique git oids.
"""
if not heads:
return self.browse(())
joins = ''.join(
f'\nJOIN runbot_merge_stagings_heads h{i} ON h{i}.staging_id = s.id'
f'\nJOIN runbot_merge_commit c{i} ON c{i}.id = h{i}.commit_id AND c{i}.sha = %s\n'
for i in range(len(heads))
)
self.env.cr.execute(f"SELECT s.id FROM runbot_merge_stagings s {joins}", heads)
stagings = self.browse(id for [id] in self.env.cr.fetchall())
stagings.check_access_rights('read')
stagings.check_access_rule('read')
return stagings
@api.returns('runbot_merge.stagings')
def for_commits(self, *heads):
"""Returns the staging(s) with all the specified commits (heads which
have actually been merged). Commits should be unique git oids.
"""
if not heads:
return self.browse(())
joins = ''.join(
f'\nJOIN runbot_merge_stagings_commits h{i} ON h{i}.staging_id = s.id'
f'\nJOIN runbot_merge_commit c{i} ON c{i}.id = h{i}.commit_id AND c{i}.sha = %s\n'
for i in range(len(heads))
)
self.env.cr.execute(f"SELECT s.id FROM runbot_merge_stagings s {joins}", heads)
stagings = self.browse(id for [id] in self.env.cr.fetchall())
stagings.check_access_rights('read')
stagings.check_access_rule('read')
return stagings
class Split(models.Model):
_name = _description = 'runbot_merge.split'

View File

@ -11,6 +11,8 @@ access_runbot_merge_pull_requests_admin,Admin access to PR,model_runbot_merge_pu
access_runbot_merge_pull_requests_tagging_admin,Admin access to tagging,model_runbot_merge_pull_requests_tagging,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_commit_admin,Admin access to commits,model_runbot_merge_commit,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_stagings_admin,Admin access to stagings,model_runbot_merge_stagings,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_stagings_heads_admin,Admin access to staging heads,model_runbot_merge_stagings_heads,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_stagings_commits_admin,Admin access to staging commits,model_runbot_merge_stagings_commits,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_stagings_cancel_admin,Admin access to cancelling stagings,model_runbot_merge_stagings_cancel,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_split_admin,Admin access to splits,model_runbot_merge_split,runbot_merge.group_admin,1,1,1,1
access_runbot_merge_batch_admin,Admin access to batches,model_runbot_merge_batch,runbot_merge.group_admin,1,1,1,1

1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
11 access_runbot_merge_pull_requests_tagging_admin Admin access to tagging model_runbot_merge_pull_requests_tagging runbot_merge.group_admin 1 1 1 1
12 access_runbot_merge_commit_admin Admin access to commits model_runbot_merge_commit runbot_merge.group_admin 1 1 1 1
13 access_runbot_merge_stagings_admin Admin access to stagings model_runbot_merge_stagings runbot_merge.group_admin 1 1 1 1
14 access_runbot_merge_stagings_heads_admin Admin access to staging heads model_runbot_merge_stagings_heads runbot_merge.group_admin 1 1 1 1
15 access_runbot_merge_stagings_commits_admin Admin access to staging commits model_runbot_merge_stagings_commits runbot_merge.group_admin 1 1 1 1
16 access_runbot_merge_stagings_cancel_admin Admin access to cancelling stagings model_runbot_merge_stagings_cancel runbot_merge.group_admin 1 1 1 1
17 access_runbot_merge_split_admin Admin access to splits model_runbot_merge_split runbot_merge.group_admin 1 1 1 1
18 access_runbot_merge_batch_admin Admin access to batches model_runbot_merge_batch runbot_merge.group_admin 1 1 1 1

View File

@ -373,14 +373,29 @@ def test_sub_match(env, project, repo_a, repo_b, repo_c, config):
a_staging = repo_a.commit('staging.master')
b_staging = repo_b.commit('staging.master')
c_staging = repo_c.commit('staging.master')
assert json.loads(st.heads) == {
repo_a.name: a_staging.id,
repo_a.name + '^': a_staging.parents[0],
repo_b.name: b_staging.id,
repo_b.name + '^': b_staging.id,
repo_c.name: c_staging.id,
repo_c.name + '^': c_staging.id,
}
assert sorted(st.head_ids.mapped('sha')) == sorted([
a_staging.id,
b_staging.id,
c_staging.id,
])
s = env['runbot_merge.stagings'].for_heads(
a_staging.id,
b_staging.id,
c_staging.id,
)
assert s == list(st.ids)
assert sorted(st.commit_ids.mapped('sha')) == sorted([
a_staging.parents[0],
b_staging.id,
c_staging.id,
])
s = env['runbot_merge.stagings'].for_commits(
a_staging.parents[0],
b_staging.id,
c_staging.id,
)
assert s == list(st.ids)
def test_merge_fail(env, project, repo_a, repo_b, users, config):
""" In a matched-branch scenario, if merging in one of the linked repos