From b1af2e573a68e603c6c82393bd5382bd25e49cd8 Mon Sep 17 00:00:00 2001 From: Xavier Morel Date: Thu, 10 Aug 2023 09:02:37 +0200 Subject: [PATCH] [IMP] runbot_merge: split staging heads out to join tables Currently the heads of a staging (both staging heads and merged heads) are just JSON data on the staging itself. Historically this was convenient as the heads were mostly of use to the staging process, and thus accessed directly through the staging essentially exclusively. However this makes finding stagings from merged commits e.g. for forensic research almost impossible, because querying based on the *values* of a JSON map is expensive, and indexing it is difficult. To make this use case more feasible, split the `heads` field into two join tables, one for the staging heads and one for the merged heads, this makes looking for stagings by commits much more efficient (although the queries may not be trivial). Also add two utility RPC methods, so that it's possible to query stagings reasonably easily and efficiently based on a set of commits (branch heads). related to #768 --- runbot_merge/__manifest__.py | 2 +- .../migrations/15.0.1.8/pre-migration.py | 6 + runbot_merge/migrations/15.0.1.8/upgrade.sql | 62 +++++ runbot_merge/models/pull_requests.py | 260 ++++++++++-------- runbot_merge/security/ir.model.access.csv | 2 + runbot_merge/tests/test_multirepo.py | 31 ++- 6 files changed, 246 insertions(+), 117 deletions(-) create mode 100644 runbot_merge/migrations/15.0.1.8/pre-migration.py create mode 100644 runbot_merge/migrations/15.0.1.8/upgrade.sql diff --git a/runbot_merge/__manifest__.py b/runbot_merge/__manifest__.py index 3640d249..04c23c7c 100644 --- a/runbot_merge/__manifest__.py +++ b/runbot_merge/__manifest__.py @@ -1,6 +1,6 @@ { 'name': 'merge bot', - 'version': '1.7', + 'version': '1.8', 'depends': ['contacts', 'website'], 'data': [ 'security/security.xml', diff --git a/runbot_merge/migrations/15.0.1.8/pre-migration.py b/runbot_merge/migrations/15.0.1.8/pre-migration.py new file mode 100644 index 00000000..8bf72c44 --- /dev/null +++ b/runbot_merge/migrations/15.0.1.8/pre-migration.py @@ -0,0 +1,6 @@ +from pathlib import Path + +def migrate(cr, version): + sql = Path(__file__).parent.joinpath('upgrade.sql')\ + .read_text(encoding='utf-8') + cr.execute(sql) diff --git a/runbot_merge/migrations/15.0.1.8/upgrade.sql b/runbot_merge/migrations/15.0.1.8/upgrade.sql new file mode 100644 index 00000000..d5dc0502 --- /dev/null +++ b/runbot_merge/migrations/15.0.1.8/upgrade.sql @@ -0,0 +1,62 @@ +CREATE TABLE runbot_merge_stagings_commits ( + id serial NOT NULL, + staging_id integer not null references runbot_merge_stagings (id), + commit_id integer not null references runbot_merge_commit (id), + repository_id integer not null references runbot_merge_repository (id) +); + +CREATE TABLE runbot_merge_stagings_heads ( + id serial NOT NULL, + staging_id integer NOT NULL REFERENCES runbot_merge_stagings (id), + commit_id integer NOT NULL REFERENCES runbot_merge_commit (id), + repository_id integer NOT NULL REFERENCES runbot_merge_repository (id) +); + +-- some of the older stagings only have the head, not the commit, +-- add the commit +UPDATE runbot_merge_stagings + SET heads = heads::jsonb || jsonb_build_object( + 'odoo/odoo^', heads::json->'odoo/odoo', + 'odoo/enterprise^', heads::json->'odoo/enterprise' + ) + WHERE heads NOT ILIKE '%^%'; + +-- some of the stagings have heads which don't exist in the commits table, +-- because they never got a status from the runbot... +-- create fake commits so we don't lose heads +INSERT INTO runbot_merge_commit (sha, statuses, create_uid, create_date, write_uid, write_date) + SELECT r.value, '{}', s.create_uid, s.create_date, s.create_uid, s.create_date + FROM runbot_merge_stagings s, + json_each_text(s.heads::json) r +ON CONFLICT DO NOTHING; + +CREATE TEMPORARY TABLE staging_commits ( + id integer NOT NULL, + repo integer NOT NULL, + -- the staging head (may be a dedup, may be the same as commit) + head integer NOT NULL, + -- the staged commit + commit integer NOT NULL +); +-- the splatting works entirely off of the staged head +-- (the one without the ^ suffix), we concat the `^` to get the corresponding +-- merge head (the actual commit to push to the branch) +INSERT INTO staging_commits (id, repo, head, commit) + SELECT s.id, re.id AS repo, h.id AS head, c.id AS commit + FROM runbot_merge_stagings s, + json_each_text(s.heads::json) r, + runbot_merge_commit h, + runbot_merge_commit c, + runbot_merge_repository re + WHERE r.key NOT ILIKE '%^' + AND re.name = r.key + AND h.sha = r.value + AND c.sha = s.heads::json->>(r.key || '^'); + +INSERT INTO runbot_merge_stagings_heads (staging_id, repository_id, commit_id) +SELECT id, repo, head FROM staging_commits; + +INSERT INTO runbot_merge_stagings_commits (staging_id, repository_id, commit_id) +SELECT id, repo, commit FROM staging_commits; + +ALTER TABLE runbot_merge_stagings DROP COLUMN heads; diff --git a/runbot_merge/models/pull_requests.py b/runbot_merge/models/pull_requests.py index 2873dbcf..0005704b 100644 --- a/runbot_merge/models/pull_requests.py +++ b/runbot_merge/models/pull_requests.py @@ -408,18 +408,19 @@ class Branch(models.Model): if not staged: return - heads = {} + heads = [] + heads_map = {} + commits = [] for repo, it in meta.items(): tree = it['gh'].commit(it['head'])['tree'] # ensures staging branches are unique and always # rebuilt r = base64.b64encode(os.urandom(12)).decode('ascii') trailer = '' - if heads: + if heads_map: trailer = '\n'.join( 'Runbot-dependency: %s:%s' % (repo, h) - for repo, h in heads.items() - if not repo.endswith('^') + for repo, h in heads_map.items() ) dummy_head = {'sha': it['head']} if it['head'] == original_heads[repo]: @@ -435,28 +436,50 @@ For-Commit-Id: %s 'parents': [it['head']], }).json() - # $repo is the head to check, $repo^ is the head to merge (they - # might be the same) - heads[repo.name + '^'] = it['head'] - heads[repo.name] = dummy_head['sha'] - self.env.cr.execute( - "INSERT INTO runbot_merge_commit (sha, to_check, statuses) " - "VALUES (%s, true, '{}') " - "ON CONFLICT (sha) DO UPDATE SET to_check=true", - [dummy_head['sha']] - ) + # special case if the two commits are identical because otherwise + # postgres raises error "ensure that no rows proposed for insertion + # within the same command have duplicate constained values" + if it['head'] == dummy_head['sha']: + self.env.cr.execute( + "INSERT INTO runbot_merge_commit (sha, to_check, statuses) " + "VALUES (%s, true, '{}') " + "ON CONFLICT (sha) DO UPDATE SET to_check=true " + "RETURNING id", + [it['head']] + ) + [commit] = [head] = self.env.cr.fetchone() + else: + self.env.cr.execute( + "INSERT INTO runbot_merge_commit (sha, to_check, statuses) " + "VALUES (%s, false, '{}'), (%s, true, '{}') " + "ON CONFLICT (sha) DO UPDATE SET to_check=true " + "RETURNING id", + [it['head'], dummy_head['sha']] + ) + ([commit], [head]) = self.env.cr.fetchall() + + heads_map[repo.name] = dummy_head['sha'] + heads.append(fields.Command.create({ + 'repository_id': repo.id, + 'commit_id': head, + })) + commits.append(fields.Command.create({ + 'repository_id': repo.id, + 'commit_id': commit, + })) # create actual staging object st = self.env['runbot_merge.stagings'].create({ 'target': self.id, 'batch_ids': [(4, batch.id, 0) for batch in staged], - 'heads': json.dumps(heads) + 'heads': heads, + 'commits': commits, }) # create staging branch from tmp token = self.project_id.github_token for r in self.project_id.repo_ids.having_branch(self): it = meta[r] - staging_head = heads[r.name] + staging_head = heads_map[r.name] _logger.info( "%s: create staging for %s:%s at %s", self.project_id.name, r.name, self.name, @@ -1716,13 +1739,7 @@ class Commit(models.Model): if pr: pr._validate(st) - stagings = Stagings.search([('heads', 'ilike', c.sha)]).filtered( - lambda s, h=c.sha: any( - head == h - for repo, head in json.loads(s.heads).items() - if not repo.endswith('^') - ) - ) + stagings = Stagings.search([('head_ids.sha', '=', c.sha)]) if stagings: stagings._validate() except Exception: @@ -1748,6 +1765,53 @@ class Commit(models.Model): """) return res +class StagingCommits(models.Model): + _name = 'runbot_merge.stagings.commits' + _description = "Mergeable commits for stagings, always the actually merged " \ + "commit, never a uniquifier" + _log_access = False + + staging_id = fields.Many2one('runbot_merge.stagings', required=True) + commit_id = fields.Many2one('runbot_merge.commit', index=True, required=True) + repository_id = fields.Many2one('runbot_merge.repository', required=True) + + def _auto_init(self): + super()._auto_init() + # the same commit can be both head and tip (?) + tools.create_unique_index( + self.env.cr, self._table + "_unique", + self._table, ['staging_id', 'commit_id'] + ) + # there should be one head per staging per repository, unless one is a + # real head and one is a uniquifier head + tools.create_unique_index( + self.env.cr, self._table + "_unique_per_repo", + self._table, ['staging_id', 'repository_id'], + ) + +class StagingHeads(models.Model): + _name = 'runbot_merge.stagings.heads' + _description = "Staging heads, may be the staging's commit or may be a " \ + "uniquifier (discarded on success)" + _log_access = False + + staging_id = fields.Many2one('runbot_merge.stagings', required=True) + commit_id = fields.Many2one('runbot_merge.commit', index=True, required=True) + repository_id = fields.Many2one('runbot_merge.repository', required=True) + + def _auto_init(self): + super()._auto_init() + # the same commit can be both head and tip (?) + tools.create_unique_index( + self.env.cr, self._table + "_unique", + self._table, ['staging_id', 'commit_id'] + ) + # there should be one head per staging per repository, unless one is a + # real head and one is a uniquifier head + tools.create_unique_index( + self.env.cr, self._table + "_unique_per_repo", + self._table, ['staging_id', 'repository_id'], + ) class Stagings(models.Model): _name = _description = 'runbot_merge.stagings' @@ -1771,9 +1835,10 @@ class Stagings(models.Model): timeout_limit = fields.Datetime(store=True, compute='_compute_timeout_limit') reason = fields.Text("Reason for final state (if any)") - # seems simpler than adding yet another indirection through a model - heads = fields.Char(required=True, help="JSON-encoded map of heads, one per repo in the project") - head_ids = fields.Many2many('runbot_merge.commit', compute='_compute_statuses') + head_ids = fields.Many2many('runbot_merge.commit', relation='runbot_merge_stagings_heads', column1='staging_id', column2='commit_id') + heads = fields.One2many('runbot_merge.stagings.heads', 'staging_id') + commit_ids = fields.Many2many('runbot_merge.commit', relation='runbot_merge_stagings_commits', column1='staging_id', column2='commit_id') + commits = fields.One2many('runbot_merge.stagings.commits', 'staging_id') statuses = fields.Binary(compute='_compute_statuses') statuses_cache = fields.Text() @@ -1786,7 +1851,7 @@ class Stagings(models.Model): return super().write(vals) previously_pending = self.filtered(lambda s: s.state == 'pending') - super(Stagings, self).write(vals) + super().write(vals) for staging in previously_pending: if staging.state != 'pending': super(Stagings, staging).write({ @@ -1795,7 +1860,6 @@ class Stagings(models.Model): return True - def name_get(self): return [ (staging.id, "%d (%s, %s%s)" % ( @@ -1812,33 +1876,18 @@ class Stagings(models.Model): """ Fetches statuses associated with the various heads, returned as (repo, context, state, url) """ - Commits = self.env['runbot_merge.commit'] - heads = { - head: repo - for st in self - for repo, head in json.loads(st.heads).items() - if not repo.endswith('^') - } - all_heads = Commits.search([('sha', 'in', list(heads))]) - commits_map = {commit.sha: commit.id for commit in all_heads} + heads = {h.commit_id: h.repository_id for h in self.mapped('heads')} + all_heads = self.mapped('head_ids') for st in self: - commits = st.head_ids = Commits._browse( - self.env, - tuple( - commits_map[h] - for repo, h in json.loads(st.heads).items() - if not repo.endswith('^') - ), - all_heads._prefetch_ids - ) if st.statuses_cache: st.statuses = json.loads(st.statuses_cache) continue + commits = st.head_ids.with_prefetch(all_heads._prefetch_ids) st.statuses = [ ( - heads[commit.sha], + heads[commit].name, context, status.get('state') or 'pending', status.get('target_url') or '' @@ -1865,27 +1914,17 @@ class Stagings(models.Model): staging.pr_ids = staging.batch_ids.prs def _validate(self): - Commits = self.env['runbot_merge.commit'] for s in self: if s.state != 'pending': continue - repos = { - repo.name: repo - for repo in self.env['runbot_merge.repository'].search([]) - .having_branch(s.target) - } # maps commits to the statuses they need required_statuses = [ - (head, repos[repo].status_ids._for_staging(s).mapped('context')) - for repo, head in json.loads(s.heads).items() - if not repo.endswith('^') + (h.commit_id.sha, h.repository_id.status_ids._for_staging(s).mapped('context')) + for h in s.heads ] # maps commits to their statuses - cmap = { - c.sha: json.loads(c.statuses) - for c in Commits.search([('sha', 'in', [h for h, _ in required_statuses])]) - } + cmap = {c.sha: json.loads(c.statuses) for c in s.head_ids} update_timeout_limit = False st = 'success' @@ -1983,19 +2022,10 @@ class Stagings(models.Model): return False # try inferring which PR failed and only mark that one - for repo, head in json.loads(self.heads).items(): - if repo.endswith('^'): - continue + for head in self.heads: + required_statuses = set(head.repository_id.status_ids._for_staging(self).mapped('context')) - required_statuses = set( - self.env['runbot_merge.repository'] - .search([('name', '=', repo)]) - .status_ids - ._for_staging(self) - .mapped('context')) - - commit = self.env['runbot_merge.commit'].search([('sha', '=', head)]) - statuses = json.loads(commit.statuses or '{}') + statuses = json.loads(head.commit_id.statuses or '{}') reason = next(( ctx for ctx, result in statuses.items() if ctx in required_statuses @@ -2004,10 +2034,7 @@ class Stagings(models.Model): if not reason: continue - pr = next(( - pr for pr in self.batch_ids.prs - if pr.repository.name == repo - ), None) + pr = next((pr for pr in self.batch_ids.prs if pr.repository == head.repository_id), None) status = to_status(statuses[reason]) viewmore = '' @@ -2016,7 +2043,7 @@ class Stagings(models.Model): if pr: self.fail("%s%s" % (reason, viewmore), pr) else: - self.fail('%s on %s%s' % (reason, head, viewmore)) + self.fail('%s on %s%s' % (reason, head.commit_id.sha, viewmore)) return False # the staging failed but we don't have a specific culprit, fail @@ -2043,7 +2070,6 @@ class Stagings(models.Model): project = self.target.project_id if self.state == 'success': gh = {repo.name: repo.github() for repo in project.repo_ids.having_branch(self.target)} - staging_heads = json.loads(self.heads) self.env.cr.execute(''' SELECT 1 FROM runbot_merge_pull_requests WHERE id in %s @@ -2053,7 +2079,7 @@ class Stagings(models.Model): with sentry_sdk.start_span(description="merge staging") as span: span.set_tag("staging", self.id) span.set_tag("branch", self.target.name) - self._safety_dance(gh, staging_heads) + self._safety_dance(gh, self.commits) except exceptions.FastForwardError as e: logger.warning( "Could not fast-forward successful staging on %s:%s", @@ -2100,7 +2126,7 @@ class Stagings(models.Model): def is_timed_out(self): return fields.Datetime.from_string(self.timeout_limit) < datetime.datetime.now() - def _safety_dance(self, gh, staging_heads): + def _safety_dance(self, gh, staging_commits: StagingCommits): """ Reverting updates doesn't work if the branches are protected (because a revert is basically a force push). So we can update REPO_A, then fail to update REPO_B for some reason, and we're hosed. @@ -2117,51 +2143,69 @@ class Stagings(models.Model): bad. In that case, wait a bit and retry for now. A more complex strategy (including disabling the branch entirely until somebody has looked at and fixed the issue) might be necessary. - - :returns: the last repo it tried to update (probably the one on which - it failed, if it failed) """ - # FIXME: would make sense for FFE to be richer, and contain the repo name - repo_name = None tmp_target = 'tmp.' + self.target.name # first force-push the current targets to all tmps - for repo_name in staging_heads.keys(): - if repo_name.endswith('^'): - continue + for repo_name in staging_commits.mapped('repository_id.name'): g = gh[repo_name] g.set_ref(tmp_target, g.head(self.target.name)) - # then attempt to FF the tmp to the staging - for repo_name, head in staging_heads.items(): - if repo_name.endswith('^'): - continue - gh[repo_name].fast_forward(tmp_target, staging_heads.get(repo_name + '^') or head) + # then attempt to FF the tmp to the staging commits + for c in staging_commits: + gh[c.repository_id.name].fast_forward(tmp_target, c.commit_id.sha) # there is still a race condition here, but it's way # lower than "the entire staging duration"... - first = True - for repo_name, head in staging_heads.items(): - if repo_name.endswith('^'): - continue - + for i, c in enumerate(staging_commits): for pause in [0.1, 0.3, 0.5, 0.9, 0]: # last one must be 0/falsy of we lose the exception try: - # if the staging has a $repo^ head, merge that, - # otherwise merge the regular (CI'd) head - gh[repo_name].fast_forward( + gh[c.repository_id.name].fast_forward( self.target.name, - staging_heads.get(repo_name + '^') or head + c.commit_id.sha ) except exceptions.FastForwardError: - # The GH API regularly fails us. If the failure does not - # occur on the first repository, retry a few times with a - # little pause. - if not first and pause: + if i and pause: time.sleep(pause) continue raise else: break - first = False - return repo_name + + @api.returns('runbot_merge.stagings') + def for_heads(self, *heads): + """Returns the staging(s) with all the specified heads. Heads should + be unique git oids. + """ + if not heads: + return self.browse(()) + + joins = ''.join( + f'\nJOIN runbot_merge_stagings_heads h{i} ON h{i}.staging_id = s.id' + f'\nJOIN runbot_merge_commit c{i} ON c{i}.id = h{i}.commit_id AND c{i}.sha = %s\n' + for i in range(len(heads)) + ) + self.env.cr.execute(f"SELECT s.id FROM runbot_merge_stagings s {joins}", heads) + stagings = self.browse(id for [id] in self.env.cr.fetchall()) + stagings.check_access_rights('read') + stagings.check_access_rule('read') + return stagings + + @api.returns('runbot_merge.stagings') + def for_commits(self, *heads): + """Returns the staging(s) with all the specified commits (heads which + have actually been merged). Commits should be unique git oids. + """ + if not heads: + return self.browse(()) + + joins = ''.join( + f'\nJOIN runbot_merge_stagings_commits h{i} ON h{i}.staging_id = s.id' + f'\nJOIN runbot_merge_commit c{i} ON c{i}.id = h{i}.commit_id AND c{i}.sha = %s\n' + for i in range(len(heads)) + ) + self.env.cr.execute(f"SELECT s.id FROM runbot_merge_stagings s {joins}", heads) + stagings = self.browse(id for [id] in self.env.cr.fetchall()) + stagings.check_access_rights('read') + stagings.check_access_rule('read') + return stagings class Split(models.Model): _name = _description = 'runbot_merge.split' diff --git a/runbot_merge/security/ir.model.access.csv b/runbot_merge/security/ir.model.access.csv index 006f59de..5b2e102b 100644 --- a/runbot_merge/security/ir.model.access.csv +++ b/runbot_merge/security/ir.model.access.csv @@ -11,6 +11,8 @@ access_runbot_merge_pull_requests_admin,Admin access to PR,model_runbot_merge_pu access_runbot_merge_pull_requests_tagging_admin,Admin access to tagging,model_runbot_merge_pull_requests_tagging,runbot_merge.group_admin,1,1,1,1 access_runbot_merge_commit_admin,Admin access to commits,model_runbot_merge_commit,runbot_merge.group_admin,1,1,1,1 access_runbot_merge_stagings_admin,Admin access to stagings,model_runbot_merge_stagings,runbot_merge.group_admin,1,1,1,1 +access_runbot_merge_stagings_heads_admin,Admin access to staging heads,model_runbot_merge_stagings_heads,runbot_merge.group_admin,1,1,1,1 +access_runbot_merge_stagings_commits_admin,Admin access to staging commits,model_runbot_merge_stagings_commits,runbot_merge.group_admin,1,1,1,1 access_runbot_merge_stagings_cancel_admin,Admin access to cancelling stagings,model_runbot_merge_stagings_cancel,runbot_merge.group_admin,1,1,1,1 access_runbot_merge_split_admin,Admin access to splits,model_runbot_merge_split,runbot_merge.group_admin,1,1,1,1 access_runbot_merge_batch_admin,Admin access to batches,model_runbot_merge_batch,runbot_merge.group_admin,1,1,1,1 diff --git a/runbot_merge/tests/test_multirepo.py b/runbot_merge/tests/test_multirepo.py index b94a1ec9..58c642fa 100644 --- a/runbot_merge/tests/test_multirepo.py +++ b/runbot_merge/tests/test_multirepo.py @@ -373,14 +373,29 @@ def test_sub_match(env, project, repo_a, repo_b, repo_c, config): a_staging = repo_a.commit('staging.master') b_staging = repo_b.commit('staging.master') c_staging = repo_c.commit('staging.master') - assert json.loads(st.heads) == { - repo_a.name: a_staging.id, - repo_a.name + '^': a_staging.parents[0], - repo_b.name: b_staging.id, - repo_b.name + '^': b_staging.id, - repo_c.name: c_staging.id, - repo_c.name + '^': c_staging.id, - } + assert sorted(st.head_ids.mapped('sha')) == sorted([ + a_staging.id, + b_staging.id, + c_staging.id, + ]) + s = env['runbot_merge.stagings'].for_heads( + a_staging.id, + b_staging.id, + c_staging.id, + ) + assert s == list(st.ids) + + assert sorted(st.commit_ids.mapped('sha')) == sorted([ + a_staging.parents[0], + b_staging.id, + c_staging.id, + ]) + s = env['runbot_merge.stagings'].for_commits( + a_staging.parents[0], + b_staging.id, + c_staging.id, + ) + assert s == list(st.ids) def test_merge_fail(env, project, repo_a, repo_b, users, config): """ In a matched-branch scenario, if merging in one of the linked repos