mirror of
https://github.com/odoo/runbot.git
synced 2025-03-20 09:55:46 +07:00
730 lines
26 KiB
Python
730 lines
26 KiB
Python
"""
|
|
Replaces relevant fixtures to allow running the test suite against github
|
|
actual (instead of a mocked version).
|
|
|
|
To enable this plugin, load it using ``-p runbot_merge.tests.remote``
|
|
|
|
.. WARNING:: this requires running ``python -mpytest`` from the root of the
|
|
runbot repository, running ``pytest`` directly will not pick it
|
|
up (as it does not setup ``sys.path``)
|
|
|
|
Configuration:
|
|
|
|
* an ``odoo`` binary in the path, which runs the relevant odoo; to ensure a
|
|
clean slate odoo is re-started and a new database is created before each
|
|
test
|
|
|
|
* pytest.ini (at the root of the runbot repo) with the following sections and
|
|
keys
|
|
|
|
``github``
|
|
- owner, the name of the account (personal or org) under which test repos
|
|
will be created & deleted
|
|
- token, either personal or oauth, must have the scopes ``public_repo``,
|
|
``delete_repo`` and ``admin:repo_hook``, if personal the owner must be
|
|
the corresponding user account, not an org
|
|
|
|
``role_reviewer``, ``role_self_reviewer`` and ``role_other``
|
|
- name (optional)
|
|
- user, the login of the user for that role
|
|
- token, a personal access token with the ``public_repo`` scope (otherwise
|
|
the API can't leave comments)
|
|
|
|
.. warning:: the accounts must *not* be flagged, or the webhooks on
|
|
commenting or creating reviews will not trigger, and the
|
|
tests will fail
|
|
|
|
* either ``ngrok`` or ``lt`` (localtunnel) available on the path. ngrok with
|
|
a configured account is recommended: ngrok is more reliable than localtunnel
|
|
but a free account is necessary to get a high-enough rate limiting for some
|
|
of the multi-repo tests to work
|
|
|
|
Finally the tests aren't 100% reliable as they rely on quite a bit of network
|
|
traffic, it's possible that the tests fail due to network issues rather than
|
|
logic errors.
|
|
"""
|
|
import base64
|
|
import collections
|
|
import configparser
|
|
import itertools
|
|
import re
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import xmlrpc.client
|
|
|
|
import pytest
|
|
import requests
|
|
|
|
# Should be pytest_configure, but apparently once a plugin is registered
|
|
# its fixtures don't get unloaded even if it's unregistered, so prevent
|
|
# registering local entirely. This works because explicit plugins (-p)
|
|
# are loaded before conftest and conftest-specified plugins (officially:
|
|
# https://docs.pytest.org/en/latest/writing_plugins.html#plugin-discovery-order-at-tool-startup).
|
|
|
|
def pytest_addhooks(pluginmanager):
|
|
pluginmanager.set_blocked('local')
|
|
|
|
def pytest_addoption(parser):
|
|
parser.addoption("--no-delete", action="store_true", help="Don't delete repo after a failed run")
|
|
parser.addoption(
|
|
'--tunnel', action="store", type="choice", choices=['ngrok', 'localtunnel'],
|
|
help="Which tunneling method to use to expose the local Odoo server "
|
|
"to hook up github's webhook. ngrok is more reliable, but "
|
|
"creating a free account is necessary to avoid rate-limiting "
|
|
"issues (anonymous limiting is rate-limited at 20 incoming "
|
|
"queries per minute, free is 40, multi-repo batching tests will "
|
|
"blow through the former); localtunnel has no rate-limiting but "
|
|
"the servers are way less reliable")
|
|
|
|
@pytest.fixture(scope="session")
|
|
def config(pytestconfig):
|
|
conf = configparser.ConfigParser(interpolation=None)
|
|
conf.read([pytestconfig.inifile])
|
|
return {
|
|
name: dict(s.items())
|
|
for name, s in conf.items()
|
|
}
|
|
|
|
PORT=8069
|
|
|
|
def wait_for_hook(n=1):
|
|
# TODO: find better way to wait for roundtrip of actions which can trigger webhooks
|
|
time.sleep(10 * n)
|
|
|
|
@pytest.fixture
|
|
def page():
|
|
s = requests.Session()
|
|
def get(url):
|
|
r = s.get('http://localhost:{}{}'.format(PORT, url))
|
|
r.raise_for_status()
|
|
return r.content
|
|
return get
|
|
|
|
def wait_for_server(db, timeout=120):
|
|
""" Polls for server to be response & have installed our module.
|
|
|
|
Raises socket.timeout on failure
|
|
"""
|
|
limit = time.time() + timeout
|
|
while True:
|
|
try:
|
|
uid = xmlrpc.client.ServerProxy(
|
|
'http://localhost:{}/xmlrpc/2/common'.format(PORT))\
|
|
.authenticate(db, 'admin', 'admin', {})
|
|
xmlrpc.client.ServerProxy(
|
|
'http://localhost:{}/xmlrpc/2/object'.format(PORT)) \
|
|
.execute_kw(db, uid, 'admin', 'runbot_merge.batch', 'search',
|
|
[[]], {'limit': 1})
|
|
break
|
|
except ConnectionRefusedError:
|
|
if time.time() > limit:
|
|
raise socket.timeout()
|
|
|
|
@pytest.fixture(scope='session')
|
|
def remote_p():
|
|
return True
|
|
|
|
@pytest.fixture
|
|
def env(request):
|
|
"""
|
|
creates a db & an environment object as a proxy to xmlrpc calls
|
|
"""
|
|
db = request.config.getoption('--db')
|
|
p = subprocess.Popen([
|
|
'odoo', '--http-port', str(PORT),
|
|
'--addons-path', request.config.getoption('--addons-path'),
|
|
'-d', db, '-i', 'runbot_merge',
|
|
'--load', 'base,web,runbot_merge',
|
|
'--max-cron-threads', '0', # disable cron threads (we're running crons by hand)
|
|
])
|
|
|
|
try:
|
|
wait_for_server(db)
|
|
|
|
yield Environment(PORT, db)
|
|
|
|
db_service = xmlrpc.client.ServerProxy('http://localhost:{}/xmlrpc/2/db'.format(PORT))
|
|
db_service.drop('admin', db)
|
|
finally:
|
|
p.terminate()
|
|
p.wait(timeout=30)
|
|
|
|
@pytest.fixture(scope='session')
|
|
def tunnel(request):
|
|
""" Creates a tunnel to localhost:8069 using ~~ngrok~~ localtunnel, should yield the
|
|
publicly routable address & terminate the process at the end of the session
|
|
"""
|
|
|
|
tunnel = request.config.getoption('--tunnel')
|
|
if tunnel == 'ngrok':
|
|
p = subprocess.Popen(['ngrok', 'http', '--region', 'eu', str(PORT)])
|
|
time.sleep(5)
|
|
try:
|
|
r = requests.get('http://localhost:4040/api/tunnels')
|
|
r.raise_for_status()
|
|
yield next(
|
|
t['public_url']
|
|
for t in r.json()['tunnels']
|
|
if t['proto'] == 'https'
|
|
)
|
|
finally:
|
|
p.terminate()
|
|
p.wait(30)
|
|
elif tunnel == 'localtunnel':
|
|
p = subprocess.Popen(['lt', '-p', str(PORT)], stdout=subprocess.PIPE)
|
|
try:
|
|
r = p.stdout.readline()
|
|
m = re.match(br'your url is: (https://.*\.localtunnel\.me)', r)
|
|
assert m, "could not get the localtunnel URL"
|
|
yield m.group(1).decode('ascii')
|
|
finally:
|
|
p.terminate()
|
|
p.wait(30)
|
|
else:
|
|
raise ValueError("Unsupported %s tunnel method" % tunnel)
|
|
|
|
ROLES = ['reviewer', 'self_reviewer', 'other']
|
|
@pytest.fixture(autouse=True)
|
|
def users(env, github, config):
|
|
# get github login of "current user"
|
|
r = github.get('https://api.github.com/user')
|
|
r.raise_for_status()
|
|
rolemap = {
|
|
'user': r.json()['login']
|
|
}
|
|
for role in ROLES:
|
|
data = config['role_' + role]
|
|
username = data['user']
|
|
rolemap[role] = username
|
|
if role == 'other':
|
|
continue
|
|
env['res.partner'].create({
|
|
'name': data.get('name', username),
|
|
'github_login': username,
|
|
'reviewer': role == 'reviewer',
|
|
'self_reviewer': role == 'self_reviewer',
|
|
})
|
|
|
|
return rolemap
|
|
|
|
@pytest.fixture
|
|
def project(env, config):
|
|
return env['runbot_merge.project'].create({
|
|
'name': 'odoo',
|
|
'github_token': config['github']['token'],
|
|
'github_prefix': 'hansen',
|
|
'branch_ids': [(0, 0, {'name': 'master'})],
|
|
'required_statuses': 'legal/cla,ci/runbot',
|
|
})
|
|
|
|
@pytest.fixture(scope='session')
|
|
def github(config):
|
|
s = requests.Session()
|
|
s.headers['Authorization'] = 'token {}'.format(config['github']['token'])
|
|
return s
|
|
|
|
@pytest.fixture
|
|
def owner(config):
|
|
return config['github']['owner']
|
|
|
|
@pytest.fixture
|
|
def make_repo(request, config, project, github, tunnel, users, owner):
|
|
# check whether "owner" is a user or an org, as repo-creation endpoint is
|
|
# different
|
|
q = github.get('https://api.github.com/users/{}'.format(owner))
|
|
q.raise_for_status()
|
|
if q.json().get('type') == 'Organization':
|
|
endpoint = 'https://api.github.com/orgs/{}/repos'.format(owner)
|
|
else:
|
|
# if not creating repos under an org, ensure the token matches the owner
|
|
assert users['user'] == owner, "when testing against a user (rather than an organisation) the API token must be the user's"
|
|
endpoint = 'https://api.github.com/user/repos'
|
|
|
|
repos = []
|
|
def repomaker(name):
|
|
fullname = '{}/{}'.format(owner, name)
|
|
# create repo
|
|
r = github.post(endpoint, json={
|
|
'name': name,
|
|
'has_issues': False,
|
|
'has_projects': False,
|
|
'has_wiki': False,
|
|
'auto_init': False,
|
|
# at least one merge method must be enabled :(
|
|
'allow_squash_merge': False,
|
|
# 'allow_merge_commit': False,
|
|
'allow_rebase_merge': False,
|
|
})
|
|
r.raise_for_status()
|
|
repos.append(fullname)
|
|
# unwatch repo
|
|
github.put('https://api.github.com/repos/{}/subscription'.format(fullname), json={
|
|
'subscribed': False,
|
|
'ignored': True,
|
|
})
|
|
# create webhook
|
|
github.post('https://api.github.com/repos/{}/hooks'.format(fullname), json={
|
|
'name': 'web',
|
|
'config': {
|
|
'url': '{}/runbot_merge/hooks'.format(tunnel),
|
|
'content_type': 'json',
|
|
'insecure_ssl': '1',
|
|
},
|
|
'events': ['pull_request', 'issue_comment', 'status', 'pull_request_review']
|
|
})
|
|
project.write({'repo_ids': [(0, 0, {'name': fullname})]})
|
|
|
|
tokens = {
|
|
r: config['role_' + r]['token']
|
|
for r in ROLES
|
|
}
|
|
tokens['user'] = config['github']['token']
|
|
|
|
return Repo(github, fullname, tokens)
|
|
|
|
yield repomaker
|
|
|
|
if not request.config.getoption('--no-delete'):
|
|
for repo in reversed(repos):
|
|
github.delete('https://api.github.com/repos/{}'.format(repo)).raise_for_status()
|
|
|
|
class Environment:
|
|
def __init__(self, port, db):
|
|
self._uid = xmlrpc.client.ServerProxy('http://localhost:{}/xmlrpc/2/common'.format(port)).authenticate(db, 'admin', 'admin', {})
|
|
self._object = xmlrpc.client.ServerProxy('http://localhost:{}/xmlrpc/2/object'.format(port))
|
|
self._db = db
|
|
|
|
def __call__(self, model, method, *args, **kwargs):
|
|
return self._object.execute_kw(
|
|
self._db, self._uid, 'admin',
|
|
model, method,
|
|
args, kwargs
|
|
)
|
|
|
|
def __getitem__(self, name):
|
|
return Model(self, name)
|
|
|
|
class Model:
|
|
__slots__ = ['_env', '_model', '_ids', '_fields']
|
|
def __init__(self, env, model, ids=(), fields=None):
|
|
object.__setattr__(self, '_env', env)
|
|
object.__setattr__(self, '_model', model)
|
|
object.__setattr__(self, '_ids', tuple(ids or ()))
|
|
|
|
object.__setattr__(self, '_fields', fields or self._env(self._model, 'fields_get', attributes=['type', 'relation']))
|
|
|
|
@property
|
|
def ids(self):
|
|
return self._ids
|
|
|
|
def __bool__(self):
|
|
return bool(self._ids)
|
|
|
|
def __len__(self):
|
|
return len(self._ids)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Model):
|
|
return NotImplemented
|
|
return self._model == other._model and self._ids == other._ids
|
|
|
|
def __repr__(self):
|
|
return "{}({})".format(self._model, ', '.join(str(id) for id in self._ids))
|
|
|
|
def exists(self):
|
|
ids = self._env(self._model, 'exists', self._ids)
|
|
return Model(self._env, self._model, ids)
|
|
|
|
def search(self, domain):
|
|
ids = self._env(self._model, 'search', domain)
|
|
return Model(self._env, self._model, ids)
|
|
|
|
def create(self, values):
|
|
return Model(self._env, self._model, [self._env(self._model, 'create', values)])
|
|
|
|
def write(self, values):
|
|
return self._env(self._model, 'write', self._ids, values)
|
|
|
|
def read(self, fields):
|
|
return self._env(self._model, 'read', self._ids, fields)
|
|
|
|
def unlink(self):
|
|
return self._env(self._model, 'unlink', self._ids)
|
|
|
|
def _check_progress(self):
|
|
assert self._model == 'runbot_merge.project'
|
|
self._run_cron('runbot_merge.merge_cron')
|
|
|
|
def _check_fetch(self):
|
|
assert self._model == 'runbot_merge.project'
|
|
self._run_cron('runbot_merge.fetch_prs_cron')
|
|
|
|
def _send_feedback(self):
|
|
assert self._model == 'runbot_merge.project'
|
|
self._run_cron('runbot_merge.feedback_cron')
|
|
|
|
def _check_linked_prs_statuses(self):
|
|
assert self._model == 'runbot_merge.pull_requests'
|
|
self._run_cron('runbot_merge.check_linked_prs_status')
|
|
|
|
def _run_cron(self, xid):
|
|
_, model, cron_id = self._env('ir.model.data', 'xmlid_lookup', xid)
|
|
assert model == 'ir.cron', "Expected {} to be a cron, got {}".format(xid, model)
|
|
self._env('ir.cron', 'method_direct_trigger', [cron_id])
|
|
# sleep for some time as a lot of crap may have happened (?)
|
|
wait_for_hook()
|
|
|
|
def __getattr__(self, fieldname):
|
|
if not self._ids:
|
|
return False
|
|
|
|
assert len(self._ids) == 1
|
|
if fieldname == 'id':
|
|
return self._ids[0]
|
|
|
|
val = self.read([fieldname])[0][fieldname]
|
|
field_description = self._fields[fieldname]
|
|
if field_description['type'] in ('many2one', 'one2many', 'many2many'):
|
|
val = val or []
|
|
if field_description['type'] == 'many2one':
|
|
val = val[:1] # (id, name) => [id]
|
|
return Model(self._env, field_description['relation'], val)
|
|
|
|
return val
|
|
|
|
def __setattr__(self, fieldname, value):
|
|
assert self._fields[fieldname]['type'] not in ('many2one', 'one2many', 'many2many')
|
|
self._env(self._model, 'write', self._ids, {fieldname: value})
|
|
|
|
def __iter__(self):
|
|
return (
|
|
Model(self._env, self._model, [i], fields=self._fields)
|
|
for i in self._ids
|
|
)
|
|
|
|
def mapped(self, path):
|
|
field, *rest = path.split('.', 1)
|
|
descr = self._fields[field]
|
|
if descr['type'] in ('many2one', 'one2many', 'many2many'):
|
|
result = Model(self._env, descr['relation'])
|
|
for record in self:
|
|
result |= getattr(record, field)
|
|
|
|
return result.mapped(rest[0]) if rest else result
|
|
|
|
assert not rest
|
|
return [getattr(r, field) for r in self]
|
|
|
|
def __or__(self, other):
|
|
if not isinstance(other, Model) or self._model != other._model:
|
|
return NotImplemented
|
|
|
|
return Model(self._env, self._model, {*self._ids, *other._ids}, fields=self._fields)
|
|
|
|
class Repo:
|
|
__slots__ = ['name', '_session', '_tokens']
|
|
def __init__(self, session, name, user_tokens):
|
|
self.name = name
|
|
self._session = session
|
|
self._tokens = user_tokens
|
|
|
|
def set_secret(self, secret):
|
|
r = self._session.get(
|
|
'https://api.github.com/repos/{}/hooks'.format(self.name))
|
|
response = r.json()
|
|
assert 200 <= r.status_code < 300, response
|
|
[hook] = response
|
|
|
|
r = self._session.patch('https://api.github.com/repos/{}/hooks/{}'.format(self.name, hook['id']), json={
|
|
'config': {**hook['config'], 'secret': secret},
|
|
})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
|
|
def get_ref(self, ref):
|
|
if re.match(r'[0-9a-f]{40}', ref):
|
|
return ref
|
|
|
|
assert ref.startswith('heads/')
|
|
r = self._session.get('https://api.github.com/repos/{}/git/refs/{}'.format(self.name, ref))
|
|
response = r.json()
|
|
|
|
assert 200 <= r.status_code < 300, response
|
|
assert isinstance(response, dict), "{} doesn't exist (got {} refs)".format(ref, len(response))
|
|
assert response['object']['type'] == 'commit'
|
|
|
|
return response['object']['sha']
|
|
|
|
def make_ref(self, name, commit, force=False):
|
|
assert name.startswith('heads/')
|
|
r = self._session.post('https://api.github.com/repos/{}/git/refs'.format(self.name), json={
|
|
'ref': 'refs/' + name,
|
|
'sha': commit,
|
|
})
|
|
if force and r.status_code == 422:
|
|
r = self._session.patch('https://api.github.com/repos/{}/git/refs/{}'.format(self.name, name), json={'sha': commit, 'force': True})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
|
|
def protect(self, branch):
|
|
r = self._session.put('https://api.github.com/repos/{}/branches/{}/protection'.format(self.name, branch), json={
|
|
'required_status_checks': None,
|
|
'enforce_admins': True,
|
|
'required_pull_request_reviews': None,
|
|
'restrictions': None,
|
|
})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
|
|
def update_ref(self, name, commit, force=False):
|
|
r = self._session.patch('https://api.github.com/repos/{}/git/refs/{}'.format(self.name, name), json={'sha': commit, 'force': force})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
|
|
def make_commit(self, ref, message, author, committer=None, tree=None, wait=True):
|
|
assert tree, "not supporting changes/updates"
|
|
|
|
if not ref: # None / []
|
|
# apparently github refuses to create trees/commits in empty repos
|
|
# using the regular API...
|
|
[(path, contents)] = tree.items()
|
|
r = self._session.put('https://api.github.com/repos/{}/contents/{}'.format(self.name, path), json={
|
|
'path': path,
|
|
'message': message,
|
|
'content': base64.b64encode(contents.encode('utf-8')).decode('ascii'),
|
|
'branch': 'nootherwaytocreateaninitialcommitbutidontwantamasteryet%d' % next(ct)
|
|
})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
return r.json()['commit']['sha']
|
|
|
|
if isinstance(ref, list):
|
|
refs = ref
|
|
else:
|
|
refs = [ref]
|
|
parents = [self.get_ref(r) for r in refs]
|
|
|
|
r = self._session.post('https://api.github.com/repos/{}/git/trees'.format(self.name), json={
|
|
'tree': [
|
|
{'path': k, 'mode': '100644', 'type': 'blob', 'content': v}
|
|
for k, v in tree.items()
|
|
]
|
|
})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
h = r.json()['sha']
|
|
|
|
data = {
|
|
'parents': parents,
|
|
'message': message,
|
|
'tree': h,
|
|
}
|
|
if author:
|
|
data['author'] = author
|
|
if committer:
|
|
data['committer'] = committer
|
|
|
|
r = self._session.post('https://api.github.com/repos/{}/git/commits'.format(self.name), json=data)
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
|
|
commit_sha = r.json()['sha']
|
|
|
|
# if the first parent is an actual ref (rather than a hash) update it
|
|
if parents[0] != refs[0]:
|
|
self.update_ref(refs[0], commit_sha)
|
|
elif wait:
|
|
wait_for_hook()
|
|
return commit_sha
|
|
|
|
def make_pr(self, title, body, target, ctid, user, label=None):
|
|
# github only allows PRs from actual branches, so create an actual branch
|
|
ref = label or "temp_trash_because_head_must_be_a_ref_%d" % next(ct)
|
|
self.make_ref('heads/' + ref, ctid)
|
|
|
|
r = self._session.post(
|
|
'https://api.github.com/repos/{}/pulls'.format(self.name),
|
|
json={'title': title, 'body': body, 'head': ref, 'base': target,},
|
|
headers={'Authorization': 'token {}'.format(self._tokens[user])}
|
|
)
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
# wait extra for PRs creating many PRs and relying on their ordering
|
|
# (test_batching & test_batching_split)
|
|
# would be nice to make the tests more reliable but not quite sure
|
|
# how...
|
|
wait_for_hook(2)
|
|
return PR(self, 'heads/' + ref, r.json()['number'])
|
|
|
|
def post_status(self, ref, status, context='default', **kw):
|
|
assert status in ('error', 'failure', 'pending', 'success')
|
|
r = self._session.post('https://api.github.com/repos/{}/statuses/{}'.format(self.name, self.get_ref(ref)), json={
|
|
'state': status,
|
|
'context': context,
|
|
**kw
|
|
})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
|
|
def commit(self, ref):
|
|
# apparently heads/<branch> ~ refs/heads/<branch> but are not
|
|
# necessarily up to date ??? unlike the git ref system where :ref
|
|
# starts at heads/
|
|
if ref.startswith('heads/'):
|
|
ref = 'refs/' + ref
|
|
|
|
r = self._session.get('https://api.github.com/repos/{}/commits/{}'.format(self.name, ref))
|
|
response = r.json()
|
|
assert 200 <= r.status_code < 300, response
|
|
|
|
c = response['commit']
|
|
return Commit(
|
|
id=response['sha'],
|
|
tree=c['tree']['sha'],
|
|
message=c['message'],
|
|
author=c['author'],
|
|
committer=c['committer'],
|
|
parents=[p['sha'] for p in response['parents']],
|
|
)
|
|
|
|
def read_tree(self, commit):
|
|
# read tree object
|
|
r = self._session.get('https://api.github.com/repos/{}/git/trees/{}'.format(self.name, commit.tree))
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
|
|
# read tree's blobs
|
|
tree = {}
|
|
for t in r.json()['tree']:
|
|
assert t['type'] == 'blob', "we're *not* doing recursive trees in test cases"
|
|
r = self._session.get('https://api.github.com/repos/{}/git/blobs/{}'.format(self.name, t['sha']))
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
tree[t['path']] = base64.b64decode(r.json()['content'])
|
|
|
|
return tree
|
|
|
|
def is_ancestor(self, sha, of):
|
|
return any(c['sha'] == sha for c in self.log(of))
|
|
|
|
def log(self, ref_or_sha):
|
|
for page in itertools.count(1):
|
|
r = self._session.get(
|
|
'https://api.github.com/repos/{}/commits'.format(self.name),
|
|
params={'sha': ref_or_sha, 'page': page}
|
|
)
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
yield from r.json()
|
|
if not r.links.get('next'):
|
|
return
|
|
|
|
ct = itertools.count()
|
|
|
|
Commit = collections.namedtuple('Commit', 'id tree message author committer parents')
|
|
|
|
class PR:
|
|
__slots__ = ['number', '_branch', 'repo']
|
|
def __init__(self, repo, branch, number):
|
|
"""
|
|
:type repo: Repo
|
|
:type branch: str
|
|
:type number: int
|
|
"""
|
|
self.number = number
|
|
self._branch = branch
|
|
self.repo = repo
|
|
|
|
@property
|
|
def _session(self):
|
|
return self.repo._session
|
|
|
|
@property
|
|
def _pr(self):
|
|
r = self._session.get('https://api.github.com/repos/{}/pulls/{}'.format(self.repo.name, self.number))
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
return r.json()
|
|
|
|
@property
|
|
def head(self):
|
|
return self._pr['head']['sha']
|
|
|
|
@property
|
|
def user(self):
|
|
return self._pr['user']['login']
|
|
|
|
@property
|
|
def state(self):
|
|
return self._pr['state']
|
|
|
|
@property
|
|
def labels(self):
|
|
r = self._session.get('https://api.github.com/repos/{}/issues/{}/labels'.format(self.repo.name, self.number))
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
return {label['name'] for label in r.json()}
|
|
|
|
@property
|
|
def comments(self):
|
|
r = self._session.get('https://api.github.com/repos/{}/issues/{}/comments'.format(self.repo.name, self.number))
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
return [
|
|
(c['user']['login'], c['body'])
|
|
for c in r.json()
|
|
]
|
|
|
|
def _set_prop(self, prop, value):
|
|
r = self._session.patch('https://api.github.com/repos/{}/pulls/{}'.format(self.repo.name, self.number), json={
|
|
prop: value
|
|
})
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
|
|
@property
|
|
def title(self):
|
|
raise NotImplementedError()
|
|
title = title.setter(lambda self, v: self._set_prop('title', v))
|
|
|
|
@property
|
|
def base(self):
|
|
raise NotImplementedError()
|
|
base = base.setter(lambda self, v: self._set_prop('base', v))
|
|
|
|
def post_comment(self, body, user):
|
|
r = self._session.post(
|
|
'https://api.github.com/repos/{}/issues/{}/comments'.format(self.repo.name, self.number),
|
|
json={'body': body},
|
|
headers={'Authorization': 'token {}'.format(self.repo._tokens[user])}
|
|
)
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
return r.json()['id']
|
|
|
|
def edit_comment(self, cid, body, user):
|
|
r = self._session.patch(
|
|
'https://api.github.com/repos/{}/issues/comments/{}'.format(self.repo.name, cid),
|
|
json={'body': body},
|
|
headers={'Authorization': 'token {}'.format(self.repo._tokens[user])}
|
|
)
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|
|
|
|
def delete_comment(self, cid, user):
|
|
r = self._session.delete(
|
|
'https://api.github.com/repos/{}/issues/comments/{}'.format(self.repo.name, cid),
|
|
headers={'Authorization': 'token {}'.format(self.repo._tokens[user])}
|
|
)
|
|
assert r.status_code == 204, r.json()
|
|
wait_for_hook()
|
|
|
|
def open(self):
|
|
self._set_prop('state', 'open')
|
|
|
|
def close(self):
|
|
self._set_prop('state', 'closed')
|
|
|
|
def push(self, sha):
|
|
self.repo.update_ref(self._branch, sha, force=True)
|
|
|
|
def post_review(self, state, user, body):
|
|
r = self._session.post(
|
|
'https://api.github.com/repos/{}/pulls/{}/reviews'.format(self.repo.name, self.number),
|
|
json={'body': body, 'event': state,},
|
|
headers={'Authorization': 'token {}'.format(self.repo._tokens[user])}
|
|
)
|
|
assert 200 <= r.status_code < 300, r.json()
|
|
wait_for_hook()
|