diff --git a/runbot_merge/models/pull_requests.py b/runbot_merge/models/pull_requests.py index 50323922..f3868e6b 100644 --- a/runbot_merge/models/pull_requests.py +++ b/runbot_merge/models/pull_requests.py @@ -62,186 +62,13 @@ class Project(models.Model): ) def _check_progress(self): - logger = _logger.getChild('cron') - Batch = self.env['runbot_merge.batch'] - PRs = self.env['runbot_merge.pull_requests'] for project in self.search([]): - gh = {repo.name: repo.github() for repo in project.repo_ids} - # check status of staged PRs for staging in project.mapped('branch_ids.active_staging_id'): - logger.info( - "Checking active staging %s (state=%s)", - staging, staging.state - ) - if staging.state == 'success': - repo_name = None - staging_heads = json.loads(staging.heads) - try: - # reverting updates doesn't work if the branches are - # protected (because a revert is basically a force - # push), instead use the tmp branch as a dry-run - tmp_target = 'tmp.' + staging.target.name - # first force-push the current targets to all tmps - for repo_name in staging_heads.keys(): - if repo_name.endswith('^'): - continue - g = gh[repo_name] - g.set_ref(tmp_target, g.head(staging.target.name)) + staging.check_status() - # then attempt to FF the tmp to the staging - for repo_name, head in staging_heads.items(): - if repo_name.endswith('^'): - continue - gh[repo_name].fast_forward(tmp_target, staging_heads.get(repo_name + '^') or head) - - # there is still a race condition here, but it's way - # lower than "the entire staging duration"... - for repo_name, head in staging_heads.items(): - if repo_name.endswith('^'): - continue - - # if the staging has a $repo^ head, merge that, - # otherwise merge the regular (CI'd) head - gh[repo_name].fast_forward( - staging.target.name, - staging_heads.get(repo_name + '^') or head - ) - except exceptions.FastForwardError as e: - logger.warning( - "Could not fast-forward successful staging on %s:%s", - repo_name, staging.target.name, - exc_info=True - ) - staging.write({ - 'state': 'ff_failed', - 'reason': str(e.__cause__ or e.__context__ or '') - }) - else: - prs = staging.mapped('batch_ids.prs') - logger.info( - "%s FF successful, marking %s as merged", - staging, prs - ) - prs.write({'state': 'merged'}) - for pr in prs: - # FIXME: this is the staging head rather than the actual merge commit for the PR - staging_head = staging_heads.get(pr.repository.name + '^') or staging_heads[pr.repository.name] - gh[pr.repository.name].close(pr.number, 'Merged in {}'.format(staging_head)) - finally: - staging.batch_ids.write({'active': False}) - staging.write({'active': False}) - elif staging.state == 'failure' or project.is_timed_out(staging): - staging.try_splitting() - # else let flow - - # check for stageable branches/prs for branch in project.branch_ids: - logger.info( - "Checking %s (%s) for staging: %s, skip? %s", - branch, branch.name, - branch.active_staging_id, - bool(branch.active_staging_id) - ) - if branch.active_staging_id: - continue + branch.try_staging() - # noinspection SqlResolve - self.env.cr.execute(""" - SELECT - min(pr.priority) as priority, - array_agg(pr.id) AS match - FROM runbot_merge_pull_requests pr - LEFT JOIN runbot_merge_batch batch ON pr.batch_id = batch.id AND batch.active - WHERE pr.target = %s - -- exclude terminal states (so there's no issue when - -- deleting branches & reusing labels) - AND pr.state != 'merged' - AND pr.state != 'closed' - GROUP BY pr.label - HAVING (bool_or(pr.priority = 0) AND NOT bool_or(pr.state = 'error')) - OR bool_and(pr.state = 'ready') - ORDER BY min(pr.priority), min(pr.id) - """, [branch.id]) - # result: [(priority, [(repo_id, pr_id) for repo in repos] - rows = self.env.cr.fetchall() - priority = rows[0][0] if rows else -1 - if priority == 0: - # p=0 take precedence over all else - batched_prs = [ - PRs.browse(pr_ids) - for _, pr_ids in takewhile(lambda r: r[0] == priority, rows) - ] - elif branch.split_ids: - split_ids = branch.split_ids[0] - logger.info("Found split of PRs %s, re-staging", split_ids.mapped('batch_ids.prs')) - batched_prs = [batch.prs for batch in split_ids.batch_ids] - split_ids.unlink() - elif rows: - # p=1 or p=2 - batched_prs = [PRs.browse(pr_ids) for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)] - else: - continue - - staged = Batch - meta = {repo: {} for repo in project.repo_ids} - for repo, it in meta.items(): - gh = it['gh'] = repo.github() - it['head'] = gh.head(branch.name) - # create tmp staging branch - gh.set_ref('tmp.{}'.format(branch.name), it['head']) - - batch_limit = project.batch_limit - for batch in batched_prs: - if len(staged) >= batch_limit: - break - staged |= Batch.stage(meta, batch) - - if staged: - heads = {} - for repo, it in meta.items(): - tree = it['gh'].commit(it['head'])['tree'] - # ensures staging branches are unique and always - # rebuilt - r = base64.b64encode(os.urandom(12)).decode('ascii') - dummy_head = it['gh']('post', 'git/commits', json={ - 'message': 'force rebuild\n\nuniquifier: %s' % r, - 'tree': tree['sha'], - 'parents': [it['head']], - }).json() - - # $repo is the head to check, $repo^ is the head to merge - heads[repo.name + '^'] = it['head'] - heads[repo.name] = dummy_head['sha'] - - # create actual staging object - st = self.env['runbot_merge.stagings'].create({ - 'target': branch.id, - 'batch_ids': [(4, batch.id, 0) for batch in staged], - 'heads': json.dumps(heads) - }) - # create staging branch from tmp - for r in project.repo_ids: - it = meta[r] - _logger.info( - "%s: create staging for %s:%s at %s", - project.name, r.name, branch.name, - heads[r.name] - ) - it['gh'].set_ref('staging.{}'.format(branch.name), heads[r.name]) - time.sleep(STAGING_SLEEP) - - # creating the staging doesn't trigger a write on the prs - # and thus the ->staging taggings, so do that by hand - Tagging = self.env['runbot_merge.pull_requests.tagging'] - for pr in st.mapped('batch_ids.prs'): - Tagging.create({ - 'pull_request': pr.number, - 'repository': pr.repository.id, - 'state_from': pr._tagstate, - 'state_to': 'staged', - }) - - logger.info("Created staging %s (%s)", st, staged) # I have no idea why this is necessary for tests to pass, the only # DB update done not through the ORM is when receiving a notification # that a PR has been closed @@ -391,6 +218,123 @@ class Branch(models.Model): for b in self: b.active_staging_id = b.staging_ids + def try_staging(self): + """ Tries to create a staging if the current branch does not already + have one. Returns None if the branch already has a staging or there + is nothing to stage, the newly created staging otherwise. + """ + logger = _logger.getChild('cron') + + logger.info( + "Checking %s (%s) for staging: %s, skip? %s", + self, self.name, + self.active_staging_id, + bool(self.active_staging_id) + ) + if self.active_staging_id: + return + + PRs = self.env['runbot_merge.pull_requests'] + + # noinspection SqlResolve + self.env.cr.execute(""" + SELECT + min(pr.priority) as priority, + array_agg(pr.id) AS match + FROM runbot_merge_pull_requests pr + LEFT JOIN runbot_merge_batch batch ON pr.batch_id = batch.id AND batch.active + WHERE pr.target = %s + -- exclude terminal states (so there's no issue when + -- deleting branches & reusing labels) + AND pr.state != 'merged' + AND pr.state != 'closed' + GROUP BY pr.label + HAVING (bool_or(pr.priority = 0) AND NOT bool_or(pr.state = 'error')) + OR bool_and(pr.state = 'ready') + ORDER BY min(pr.priority), min(pr.id) + """, [self.id]) + # result: [(priority, [(repo_id, pr_id) for repo in repos] + rows = self.env.cr.fetchall() + priority = rows[0][0] if rows else -1 + if priority == 0: + # p=0 take precedence over all else + batched_prs = [PRs.browse(pr_ids) for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)] + elif self.split_ids: + split_ids = self.split_ids[0] + logger.info("Found split of PRs %s, re-staging", split_ids.mapped('batch_ids.prs')) + batched_prs = [batch.prs for batch in split_ids.batch_ids] + split_ids.unlink() + elif rows: + # p=1 or p=2 + batched_prs = [PRs.browse(pr_ids) for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)] + else: + return + + Batch = self.env['runbot_merge.batch'] + staged = Batch + meta = {repo: {} for repo in self.project_id.repo_ids} + for repo, it in meta.items(): + gh = it['gh'] = repo.github() + it['head'] = gh.head(self.name) + # create tmp staging branch + gh.set_ref('tmp.{}'.format(self.name), it['head']) + + batch_limit = self.project_id.batch_limit + for batch in batched_prs: + if len(staged) >= batch_limit: + break + staged |= Batch.stage(meta, batch) + + if not staged: + return + + heads = {} + for repo, it in meta.items(): + tree = it['gh'].commit(it['head'])['tree'] + # ensures staging branches are unique and always + # rebuilt + r = base64.b64encode(os.urandom(12)).decode('ascii') + dummy_head = it['gh']('post', 'git/commits', json={ + 'message': 'force rebuild\n\nuniquifier: %s' % r, + 'tree': tree['sha'], + 'parents': [it['head']], + }).json() + + # $repo is the head to check, $repo^ is the head to merge + heads[repo.name + '^'] = it['head'] + heads[repo.name] = dummy_head['sha'] + + # create actual staging object + st = self.env['runbot_merge.stagings'].create({ + 'target': self.id, + 'batch_ids': [(4, batch.id, 0) for batch in staged], + 'heads': json.dumps(heads) + }) + # create staging branch from tmp + for r in self.project_id.repo_ids: + it = meta[r] + _logger.info( + "%s: create staging for %s:%s at %s", + self.project_id.name, r.name, self.name, + heads[r.name] + ) + it['gh'].set_ref('staging.{}'.format(self.name), heads[r.name]) + time.sleep(STAGING_SLEEP) + + # creating the staging doesn't trigger a write on the prs + # and thus the ->staging taggings, so do that by hand + Tagging = self.env['runbot_merge.pull_requests.tagging'] + for pr in st.mapped('batch_ids.prs'): + Tagging.create({ + 'pull_request': pr.number, + 'repository': pr.repository.id, + 'state_from': pr._tagstate, + 'state_to': 'staged', + }) + + logger.info("Created staging %s (%s)", st, staged) + return st + class PullRequests(models.Model): _name = 'runbot_merge.pull_requests' _order = 'number desc' @@ -952,6 +896,83 @@ class Stagings(models.Model): return False + def check_status(self): + """ + Checks the status of an active staging: + * merges it if successful + * splits it if failed (or timed out) and more than 1 batch + * marks the PRs as failed otherwise + * ignores if pending (or cancelled or ff_failed but those should also + be disabled) + """ + logger = _logger.getChild('cron') + if not self.active: + logger.info("Staging %s is not active, ignoring status check", self) + return + + logger.info("Checking active staging %s (state=%s)", self, self.state) + project = self.target.project_id + if self.state == 'success': + gh = {repo.name: repo.github() for repo in project.repo_ids} + repo_name = None + staging_heads = json.loads(self.heads) + try: + # reverting updates doesn't work if the branches are + # protected (because a revert is basically a force + # push), instead use the tmp branch as a dry-run + tmp_target = 'tmp.' + self.target.name + # first force-push the current targets to all tmps + for repo_name in staging_heads.keys(): + if repo_name.endswith('^'): + continue + g = gh[repo_name] + g.set_ref(tmp_target, g.head(self.target.name)) + + # then attempt to FF the tmp to the staging + for repo_name, head in staging_heads.items(): + if repo_name.endswith('^'): + continue + gh[repo_name].fast_forward(tmp_target, staging_heads.get(repo_name + '^') or head) + + # there is still a race condition here, but it's way + # lower than "the entire staging duration"... + for repo_name, head in staging_heads.items(): + if repo_name.endswith('^'): + continue + + # if the staging has a $repo^ head, merge that, + # otherwise merge the regular (CI'd) head + gh[repo_name].fast_forward( + self.target.name, + staging_heads.get(repo_name + '^') or head + ) + except exceptions.FastForwardError as e: + logger.warning( + "Could not fast-forward successful staging on %s:%s", + repo_name, self.target.name, + exc_info=True + ) + self.write({ + 'state': 'ff_failed', + 'reason': str(e.__cause__ or e.__context__ or '') + }) + else: + prs = self.mapped('batch_ids.prs') + logger.info( + "%s FF successful, marking %s as merged", + self, prs + ) + prs.write({'state': 'merged'}) + for pr in prs: + # FIXME: this is the staging head rather than the actual merge commit for the PR + staging_head = staging_heads.get(pr.repository.name + '^') or staging_heads[pr.repository.name] + gh[pr.repository.name].close(pr.number, 'Merged in {}'.format(staging_head)) + finally: + self.batch_ids.write({'active': False}) + self.write({'active': False}) + elif self.state == 'failure' or project.is_timed_out(self): + self.try_splitting() + class Split(models.Model): _name = 'runbot_merge.split'