From cab683ae0f7aed2fa49edbb6823af4805aa18f6b Mon Sep 17 00:00:00 2001 From: Xavier Morel Date: Thu, 21 Jun 2018 09:55:14 +0200 Subject: [PATCH] [IMP] runbot_merge: remove "sync PRs", fetch unknown PRs The old "sync pr" thing is turning out to be a bust, while it originally worked fine these days it's a catastrophe as the v4 API performances seem to have significantly degraded, to the point that fetching all 15k PRs by pages of 100 simply blows up after a few hundreds/thousands. Instead, add a table of PRs to sync: if we get notified of a "compatible" PR (enabled repo & target) which we don't know of, create an entry in a "fetch jobs" table, then a cron will handle fetching the PR then fetching/applying all relevant metadata (statuses, review-comments and reviews). Also change indexation of Commit(sha) and PR(head) to hash, as btree indexes are not really sensible for such content (the ordering is unhelpful and the index locality is awful by design/definition). --- runbot_merge/controllers/__init__.py | 75 +++++----- runbot_merge/data/merge_cron.xml | 10 ++ runbot_merge/github.py | 134 ++++------------- runbot_merge/models/pull_requests.py | 165 +++++++++++---------- runbot_merge/tests/fake_github/__init__.py | 88 ++++++++++- runbot_merge/tests/remote.py | 13 +- runbot_merge/tests/test_basic.py | 47 +++++- runbot_merge/views/mergebot.xml | 4 - 8 files changed, 298 insertions(+), 238 deletions(-) diff --git a/runbot_merge/controllers/__init__.py b/runbot_merge/controllers/__init__.py index 0a8b52c9..a59910aa 100644 --- a/runbot_merge/controllers/__init__.py +++ b/runbot_merge/controllers/__init__.py @@ -18,23 +18,26 @@ class MergebotController(Controller): event = req.headers['X-Github-Event'] c = EVENTS.get(event) - if c: - repo = request.jsonrequest['repository']['full_name'] - secret = request.env(user=1)['runbot_merge.repository'].search([ - ('name', '=', repo), - ]).project_id.secret - if secret: - signature = 'sha1=' + hmac.new(secret.encode('ascii'), req.get_data(), hashlib.sha1).hexdigest() - if not hmac.compare_digest(signature, req.headers.get('X-Hub-Signature', '')): - _logger.warn("Ignored hook with incorrect signature %s", - req.headers.get('X-Hub-Signature')) - return werkzeug.exceptions.Forbidden() + if not c: + _logger.warn('Unknown event %s', event) + return 'Unknown event {}'.format(event) - return c(request.jsonrequest) - _logger.warn('Unknown event %s', event) - return 'Unknown event {}'.format(event) + repo = request.jsonrequest['repository']['full_name'] + env = request.env(user=1) -def handle_pr(event): + secret = env['runbot_merge.repository'].search([ + ('name', '=', repo), + ]).project_id.secret + if secret: + signature = 'sha1=' + hmac.new(secret.encode('ascii'), req.get_data(), hashlib.sha1).hexdigest() + if not hmac.compare_digest(signature, req.headers.get('X-Hub-Signature', '')): + _logger.warn("Ignored hook with incorrect signature %s", + req.headers.get('X-Hub-Signature')) + return werkzeug.exceptions.Forbidden() + + return c(env, request.jsonrequest) + +def handle_pr(env, event): if event['action'] in [ 'assigned', 'unassigned', 'review_requested', 'review_request_removed', 'labeled', 'unlabeled' @@ -47,7 +50,6 @@ def handle_pr(event): ) return 'Ignoring' - env = request.env(user=1) pr = event['pull_request'] r = pr['base']['repo']['full_name'] b = pr['base']['ref'] @@ -93,7 +95,7 @@ def handle_pr(event): # retargeting from un-managed => create if not source_branch: - return handle_pr(dict(event, action='opened')) + return handle_pr(env, dict(event, action='opened')) updates = {} if source_branch != branch: @@ -135,10 +137,10 @@ def handle_pr(event): }) return "Tracking PR as {}".format(pr_obj.id) - pr_obj = find(branch) + pr_obj = env['runbot_merge.pull_requests']._get_or_schedule(r, pr['number']) if not pr_obj: - _logger.warn("webhook %s on unknown PR %s:%s", event['action'], repo.name, pr['number']) - return "Unknown PR {}:{}".format(repo.name, pr['number']) + _logger.warn("webhook %s on unknown PR %s:%s, scheduled fetch", event['action'], repo.name, pr['number']) + return "Unknown PR {}:{}, scheduling fetch".format(repo.name, pr['number']) if event['action'] == 'synchronize': if pr_obj.head == pr['head']['sha']: return 'No update to pr head' @@ -178,13 +180,13 @@ def handle_pr(event): _logger.info("Ignoring event %s on PR %s", event['action'], pr['number']) return "Not handling {} yet".format(event['action']) -def handle_status(event): +def handle_status(env, event): _logger.info( 'status %s:%s on commit %s', event['context'], event['state'], event['sha'], ) - Commits = request.env(user=1)['runbot_merge.commit'] + Commits = env['runbot_merge.commit'] c = Commits.search([('sha', '=', event['sha'])]) if c: c.statuses = json.dumps({ @@ -199,7 +201,7 @@ def handle_status(event): return 'ok' -def handle_comment(event): +def handle_comment(env, event): if 'pull_request' not in event['issue']: return "issue comment, ignoring" @@ -211,30 +213,31 @@ def handle_comment(event): event['comment']['body'], ) - env = request.env(user=1) partner = env['res.partner'].search([('github_login', '=', event['sender']['login']),]) - pr = env['runbot_merge.pull_requests'].search([ - ('number', '=', event['issue']['number']), - ('repository.name', '=', event['repository']['full_name']), - ]) if not partner: _logger.info("ignoring comment from %s: not in system", event['sender']['login']) return 'ignored' + pr = env['runbot_merge.pull_requests']._get_or_schedule( + event['repository']['full_name'], + event['issue']['number'], + ) + if not pr: + return "Unknown PR, scheduling fetch" return pr._parse_commands(partner, event['comment']['body']) -def handle_review(event): - env = request.env(user=1) - +def handle_review(env, event): partner = env['res.partner'].search([('github_login', '=', event['review']['user']['login'])]) if not partner: _logger.info('ignoring comment from %s: not in system', event['review']['user']['login']) return 'ignored' - pr = env['runbot_merge.pull_requests'].search([ - ('number', '=', event['pull_request']['number']), - ('repository.name', '=', event['repository']['full_name']) - ]) + pr = env['runbot_merge.pull_requests']._get_or_schedule( + event['repository']['full_name'], + event['pull_request']['number'], + ) + if not pr: + return "Unknown PR, scheduling fetch" firstline = '' state = event['review']['state'].lower() @@ -245,7 +248,7 @@ def handle_review(event): return pr._parse_commands(partner, firstline + event['review']['body']) -def handle_ping(event): +def handle_ping(env, event): print("Got ping! {}".format(event['zen'])) return "pong" diff --git a/runbot_merge/data/merge_cron.xml b/runbot_merge/data/merge_cron.xml index 3f864d48..cd001c2d 100644 --- a/runbot_merge/data/merge_cron.xml +++ b/runbot_merge/data/merge_cron.xml @@ -9,4 +9,14 @@ -1 + + Check for PRs to fetch + + code + model._check_fetch(True) + 1 + minutes + -1 + + diff --git a/runbot_merge/github.py b/runbot_merge/github.py index e4fdb934..b2eaf696 100644 --- a/runbot_merge/github.py +++ b/runbot_merge/github.py @@ -1,10 +1,10 @@ import collections import functools +import itertools import logging import requests -from odoo.exceptions import UserError from . import exceptions _logger = logging.getLogger(__name__) @@ -110,113 +110,31 @@ class GH(object): self.set_ref(dest, c['sha']) return c - # -- + # fetch various bits of issues / prs to load them + def pr(self, number): + return ( + self('get', 'issues/{}'.format(number)).json(), + self('get', 'pulls/{}'.format(number)).json() + ) - def prs(self): - cursor = None - owner, name = self._repo.split('/') - while True: - r = self._session.post('{}/graphql'.format(self._url), json={ - 'query': PR_QUERY, - 'variables': { - 'owner': owner, - 'name': name, - 'cursor': cursor, - } - }) - response = r.json() - if 'data' not in response: - raise UserError('\n'.join(e['message'] for e in response.get('errors', map(str, [r.status_code, r.reason, r.text])))) + def comments(self, number): + for page in itertools.count(1): + r = self('get', 'issues/{}/comments?page={}'.format(number, page)) + yield from r.json() + if not r.links.get('next'): + return - result = response['data']['repository']['pullRequests'] - for pr in result['nodes']: - statuses = into(pr, 'headRef.target.status.contexts') or [] + def reviews(self, number): + for page in itertools.count(1): + r = self('get', 'pulls/{}/reviews?page={}'.format(number, page)) + yield from r.json() + if not r.links.get('next'): + return - author = into(pr, 'author.login') or into(pr, 'headRepositoryOwner.login') - source = into(pr, 'headRepositoryOwner.login') or into(pr, 'author.login') - label = source and "{}:{}".format(source, pr['headRefName']) - yield { - 'number': pr['number'], - 'title': pr['title'], - 'body': pr['body'], - 'head': { - 'ref': pr['headRefName'], - 'sha': pr['headRefOid'], - # headRef may be null if the pr branch was ?deleted? - # (mostly closed PR concerns?) - 'statuses': { - c['context']: c['state'] - for c in statuses - }, - 'label': label, - }, - 'state': pr['state'].lower(), - 'user': {'login': author}, - 'base': { - 'ref': pr['baseRefName'], - 'repo': { - 'full_name': pr['repository']['nameWithOwner'], - } - }, - 'commits': pr['commits']['totalCount'], - } - - if result['pageInfo']['hasPreviousPage']: - cursor = result['pageInfo']['startCursor'] - else: - break -def into(d, path): - return functools.reduce( - lambda v, segment: v and v.get(segment), - path.split('.'), - d - ) - -PR_QUERY = """ -query($owner: String!, $name: String!, $cursor: String) { - rateLimit { remaining } - repository(owner: $owner, name: $name) { - pullRequests(last: 100, before: $cursor) { - pageInfo { startCursor hasPreviousPage } - nodes { - author { # optional - login - } - number - title - body - state - repository { nameWithOwner } - baseRefName - headRefOid - headRepositoryOwner { # optional - login - } - headRefName - headRef { # optional - target { - ... on Commit { - status { - contexts { - context - state - } - } - } - } - } - commits { totalCount } - #comments(last: 100) { - # nodes { - # author { - # login - # } - # body - # bodyText - # } - #} - } - } - } -} -""" + def statuses(self, h): + r = self('get', 'commits/{}/status'.format(h)).json() + return [{ + 'sha': r['sha'], + 'context': s['context'], + 'state': s['state'], + } for s in r['statuses']] diff --git a/runbot_merge/models/pull_requests.py b/runbot_merge/models/pull_requests.py index 67325ffb..14eaea8e 100644 --- a/runbot_merge/models/pull_requests.py +++ b/runbot_merge/models/pull_requests.py @@ -10,7 +10,7 @@ from itertools import takewhile from odoo import api, fields, models, tools from odoo.exceptions import ValidationError -from .. import github, exceptions +from .. import github, exceptions, controllers _logger = logging.getLogger(__name__) class Project(models.Model): @@ -234,85 +234,25 @@ class Project(models.Model): def is_timed_out(self, staging): return fields.Datetime.from_string(staging.staged_at) + datetime.timedelta(minutes=self.ci_timeout) < datetime.datetime.now() - def sync_prs(self): - _logger.info("Synchronizing PRs for %s", self.name) - Commits = self.env['runbot_merge.commit'] - PRs = self.env['runbot_merge.pull_requests'] - Partners = self.env['res.partner'] - branches = { - b.name: b - for b in self.branch_ids - } - authors = { - p.github_login: p - for p in Partners.search([]) - if p.github_login - } - for repo in self.repo_ids: - gh = repo.github() - created = 0 - ignored_targets = collections.Counter() - prs = { - pr.number: pr - for pr in PRs.search([ - ('repository', '=', repo.id), - ]) - } - for i, pr in enumerate(gh.prs()): - message = "{}\n\n{}".format(pr['title'].strip(), pr['body'].strip()) - existing = prs.get(pr['number']) - target = pr['base']['ref'] - if existing: - if target not in branches: - _logger.info("PR %d retargeted to non-managed branch %s, deleting", pr['number'], - target) - ignored_targets.update([target]) - existing.write({'active': False}) - else: - if message != existing.message: - _logger.info("Updating PR %d ({%s} != {%s})", pr['number'], existing.message, message) - existing.message = message - continue + def _check_fetch(self, commit=False): + """ + :param bool commit: commit after each fetch has been executed + """ + while True: + f = self.env['runbot_merge.fetch_job'].search([], limit=1) + if not f: + return - # not for a selected target => skip - if target not in branches: - ignored_targets.update([target]) - continue + repo = self.env['runbot_merge.repository'].search([('name', '=', f.repository)]) + if repo: + repo._load_pr(f.number) + else: + _logger.warn("Fetch job for unknown repository %s, disabling & skipping", f.repository) - # old PR, source repo may have been deleted, ignore - if not pr['head']['label']: - _logger.info('ignoring PR %d: no label', pr['number']) - continue - - login = pr['user']['login'] - # no author on old PRs, account deleted - author = authors.get(login, Partners) - if login and not author: - author = authors[login] = Partners.create({ - 'name': login, - 'github_login': login, - }) - head = pr['head']['sha'] - PRs.create({ - 'number': pr['number'], - 'label': pr['head']['label'], - 'author': author.id, - 'target': branches[target].id, - 'repository': repo.id, - 'head': head, - 'squash': pr['commits'] == 1, - 'message': message, - 'state': 'opened' if pr['state'] == 'open' - else 'merged' if pr.get('merged') - else 'closed' - }) - c = Commits.search([('sha', '=', head)]) or Commits.create({'sha': head}) - c.statuses = json.dumps(pr['head']['statuses']) - - created += 1 - _logger.info("%d new prs in %s", created, repo.name) - _logger.info('%d ignored PRs for un-managed targets: (%s)', sum(ignored_targets.values()), dict(ignored_targets)) - return False + # commit after each fetched PR + f.active = False + if commit: + self.env.cr.commit() class Repository(models.Model): _name = 'runbot_merge.repository' @@ -329,6 +269,33 @@ class Repository(models.Model): self._cr, 'runbot_merge_unique_repo', self._table, ['name']) return res + def _load_pr(self, number): + gh = self.github() + + # fetch PR object and handle as *opened* + issue, pr = gh.pr(number) + controllers.handle_pr(self.env, { + 'action': 'opened', + 'pull_request': pr, + }) + for st in gh.statuses(pr['head']['sha']): + controllers.handle_status(self.env, st) + # get and handle all comments + for comment in gh.comments(number): + controllers.handle_comment(self.env, { + 'issue': issue, + 'sender': comment['user'], + 'comment': comment, + 'repository': {'full_name': self.name}, + }) + # get and handle all reviews + for review in gh.reviews(number): + controllers.handle_review(self.env, { + 'review': review, + 'pull_request': pr, + 'repository': {'full_name': self.name}, + }) + class Branch(models.Model): _name = 'runbot_merge.branch' @@ -380,7 +347,7 @@ class PullRequests(models.Model): number = fields.Integer(required=True, index=True) author = fields.Many2one('res.partner') - head = fields.Char(required=True, index=True) + head = fields.Char(required=True) label = fields.Char( required=True, index=True, help="Label of the source branch (owner:branchname), used for " @@ -415,6 +382,19 @@ class PullRequests(models.Model): for r in self: r.batch_id = r.batch_ids.filtered(lambda b: b.active)[:1] + def _get_or_schedule(self, repo_name, number): + pr = self.search([ + ('repository.name', '=', repo_name), + ('number', '=', number,) + ]) + if pr: + return pr + + Fetch = self.env['runbot_merge.fetch_job'] + if Fetch.search([('repository', '=', repo_name), ('number', '=', number)]): + return + Fetch.create({'repository': repo_name, 'number': number}) + def _parse_command(self, commandstring): m = re.match(r'(\w+)(?:([+-])|=(.*))?', commandstring) if not m: @@ -456,6 +436,8 @@ class PullRequests(models.Model): sets the priority to normal (2), pressing (1) or urgent (0). Lower-priority PRs are selected first and batched together. """ + assert self, "parsing commands must be executed in an actual PR" + is_admin = (author.reviewer and self.author != author) or (author.self_reviewer and self.author == author) is_reviewer = is_admin or self in author.delegate_reviewer # TODO: should delegate reviewers be able to retry PRs? @@ -565,6 +547,9 @@ class PullRequests(models.Model): res = super(PullRequests, self)._auto_init() tools.create_unique_index( self._cr, 'runbot_merge_unique_pr_per_target', self._table, ['number', 'target', 'repository']) + self._cr.execute("CREATE INDEX IF NOT EXISTS runbot_merge_pr_head " + "ON runbot_merge_pull_requests " + "USING hash (head)") return res @property @@ -576,6 +561,10 @@ class PullRequests(models.Model): @api.model def create(self, vals): pr = super().create(vals) + c = self.env['runbot_merge.commit'].search([('sha', '=', pr.head)]) + if c and c.statuses: + pr._validate(json.loads(c.statuses)) + if pr.state not in ('closed', 'merged'): self.env['runbot_merge.pull_requests.tagging'].create({ 'pull_request': pr.number, @@ -698,10 +687,17 @@ class Commit(models.Model): if stagings: stagings._validate() + _sql_constraints = [ + ('unique_sha', 'unique (sha)', 'no duplicated commit'), + ] + def _auto_init(self): res = super(Commit, self)._auto_init() - tools.create_unique_index( - self._cr, 'runbot_merge_unique_statuses', self._table, ['sha']) + self._cr.execute(""" + CREATE INDEX IF NOT EXISTS runbot_merge_unique_statuses + ON runbot_merge_commit + USING hash (sha) + """) return res class Stagings(models.Model): @@ -900,3 +896,10 @@ class Batch(models.Model): 'target': prs[0].target.id, 'prs': [(4, pr.id, 0) for pr in prs], }) + +class FetchJob(models.Model): + _name = 'runbot_merge.fetch_job' + + active = fields.Boolean(default=True) + repository = fields.Char(index=True) + number = fields.Integer(index=True) diff --git a/runbot_merge/tests/fake_github/__init__.py b/runbot_merge/tests/fake_github/__init__.py index d770c97b..547c1df5 100644 --- a/runbot_merge/tests/fake_github/__init__.py +++ b/runbot_merge/tests/fake_github/__init__.py @@ -36,12 +36,12 @@ class APIResponse(responses.BaseResponse): headers = self.get_headers() body = io.BytesIO(b'') - if r: + if r is not None: body = io.BytesIO(json.dumps(r).encode('utf-8')) return responses.HTTPResponse( status=status, - reason=r.get('message') if r else "bollocks", + reason=r.get('message') if isinstance(r, dict) else "bollocks", body=body, headers=headers, preload_content=False, ) @@ -190,7 +190,8 @@ class Repo(object): return self._save_tree(o) def api(self, path, request): - for method, pattern, handler in self._handlers: + # a better version would be some sort of longest-match? + for method, pattern, handler in sorted(self._handlers, key=lambda t: -len(t[1])): if method and request.method != method: continue @@ -302,6 +303,42 @@ class Repo(object): "parents": [{"sha": p} for p in c.parents], }) + def _read_statuses(self, _, ref): + try: + c = self.commit(ref) + except KeyError: + return (404, None) + + return (200, { + 'sha': c.id, + 'total_count': len(c.statuses), + # TODO: combined? + 'statuses': [ + {'context': context, 'state': state} + for state, context, _ in reversed(c.statuses) + ] + }) + + def _read_issue(self, r, number): + try: + issue = self.issues[int(number)] + except KeyError: + return (404, None) + attr = {'pull_request': True} if isinstance(issue, PR) else {} + return (200, {'number': issue.number, **attr}) + + def _read_issue_comments(self, r, number): + try: + issue = self.issues[int(number)] + except KeyError: + return (404, None) + return (200, [{ + 'user': {'login': author}, + 'body': body, + } for author, body in issue.comments + if not body.startswith('REVIEW') + ]) + def _create_issue_comment(self, r, number): try: issue = self.issues[int(number)] @@ -319,6 +356,31 @@ class Repo(object): 'user': { 'login': "user" }, }) + def _read_pr(self, r, number): + try: + pr = self.issues[int(number)] + except KeyError: + return (404, None) + # FIXME: dedup with Client + return (200, { + 'number': pr.number, + 'head': { + 'sha': pr.head, + 'label': pr.label, + }, + 'base': { + 'ref': pr.base, + 'repo': { + 'name': self.name.split('/')[1], + 'full_name': self.name, + }, + }, + 'title': pr.title, + 'body': pr.body, + 'commits': pr.commits, + 'user': {'login': pr.user}, + }) + def _edit_pr(self, r, number): try: pr = self.issues[int(number)] @@ -346,6 +408,21 @@ class Repo(object): return (200, {}) + def _read_pr_reviews(self, _, number): + pr = self.issues.get(int(number)) + if not isinstance(pr, PR): + return (404, None) + + return (200, [{ + 'user': {'login': author}, + 'state': r.group(1), + 'body': r.group(2), + } + for author, body in pr.comments + for r in [re.match(r'REVIEW (\w+)\n\n(.*)', body)] + if r + ]) + def _add_labels(self, r, number): try: pr = self.issues[int(number)] @@ -424,12 +501,17 @@ class Repo(object): # nb: there's a different commits at /commits with repo-level metadata ('GET', r'git/commits/(?P[0-9A-Fa-f]{40})', _read_commit), ('POST', r'git/commits', _create_commit), + ('GET', r'commits/(?P[^/]+)/status', _read_statuses), + ('GET', r'issues/(?P\d+)', _read_issue), + ('GET', r'issues/(?P\d+)/comments', _read_issue_comments), ('POST', r'issues/(?P\d+)/comments', _create_issue_comment), ('POST', r'merges', _do_merge), + ('GET', r'pulls/(?P\d+)', _read_pr), ('PATCH', r'pulls/(?P\d+)', _edit_pr), + ('GET', r'pulls/(?P\d+)/reviews', _read_pr_reviews), ('POST', r'issues/(?P\d+)/labels', _add_labels), ('DELETE', r'issues/(?P\d+)/labels/(?P