[IMP] runbot_merge: split the MegaCron into sub-methods

This commit is contained in:
Xavier Morel 2018-10-12 16:15:37 +02:00
parent 5ec2c12454
commit bab7e25a9a

View File

@ -62,186 +62,13 @@ class Project(models.Model):
)
def _check_progress(self):
logger = _logger.getChild('cron')
Batch = self.env['runbot_merge.batch']
PRs = self.env['runbot_merge.pull_requests']
for project in self.search([]):
gh = {repo.name: repo.github() for repo in project.repo_ids}
# check status of staged PRs
for staging in project.mapped('branch_ids.active_staging_id'):
logger.info(
"Checking active staging %s (state=%s)",
staging, staging.state
)
if staging.state == 'success':
repo_name = None
staging_heads = json.loads(staging.heads)
try:
# reverting updates doesn't work if the branches are
# protected (because a revert is basically a force
# push), instead use the tmp branch as a dry-run
tmp_target = 'tmp.' + staging.target.name
# first force-push the current targets to all tmps
for repo_name in staging_heads.keys():
if repo_name.endswith('^'):
continue
g = gh[repo_name]
g.set_ref(tmp_target, g.head(staging.target.name))
staging.check_status()
# then attempt to FF the tmp to the staging
for repo_name, head in staging_heads.items():
if repo_name.endswith('^'):
continue
gh[repo_name].fast_forward(tmp_target, staging_heads.get(repo_name + '^') or head)
# there is still a race condition here, but it's way
# lower than "the entire staging duration"...
for repo_name, head in staging_heads.items():
if repo_name.endswith('^'):
continue
# if the staging has a $repo^ head, merge that,
# otherwise merge the regular (CI'd) head
gh[repo_name].fast_forward(
staging.target.name,
staging_heads.get(repo_name + '^') or head
)
except exceptions.FastForwardError as e:
logger.warning(
"Could not fast-forward successful staging on %s:%s",
repo_name, staging.target.name,
exc_info=True
)
staging.write({
'state': 'ff_failed',
'reason': str(e.__cause__ or e.__context__ or '')
})
else:
prs = staging.mapped('batch_ids.prs')
logger.info(
"%s FF successful, marking %s as merged",
staging, prs
)
prs.write({'state': 'merged'})
for pr in prs:
# FIXME: this is the staging head rather than the actual merge commit for the PR
staging_head = staging_heads.get(pr.repository.name + '^') or staging_heads[pr.repository.name]
gh[pr.repository.name].close(pr.number, 'Merged in {}'.format(staging_head))
finally:
staging.batch_ids.write({'active': False})
staging.write({'active': False})
elif staging.state == 'failure' or project.is_timed_out(staging):
staging.try_splitting()
# else let flow
# check for stageable branches/prs
for branch in project.branch_ids:
logger.info(
"Checking %s (%s) for staging: %s, skip? %s",
branch, branch.name,
branch.active_staging_id,
bool(branch.active_staging_id)
)
if branch.active_staging_id:
continue
branch.try_staging()
# noinspection SqlResolve
self.env.cr.execute("""
SELECT
min(pr.priority) as priority,
array_agg(pr.id) AS match
FROM runbot_merge_pull_requests pr
LEFT JOIN runbot_merge_batch batch ON pr.batch_id = batch.id AND batch.active
WHERE pr.target = %s
-- exclude terminal states (so there's no issue when
-- deleting branches & reusing labels)
AND pr.state != 'merged'
AND pr.state != 'closed'
GROUP BY pr.label
HAVING (bool_or(pr.priority = 0) AND NOT bool_or(pr.state = 'error'))
OR bool_and(pr.state = 'ready')
ORDER BY min(pr.priority), min(pr.id)
""", [branch.id])
# result: [(priority, [(repo_id, pr_id) for repo in repos]
rows = self.env.cr.fetchall()
priority = rows[0][0] if rows else -1
if priority == 0:
# p=0 take precedence over all else
batched_prs = [
PRs.browse(pr_ids)
for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)
]
elif branch.split_ids:
split_ids = branch.split_ids[0]
logger.info("Found split of PRs %s, re-staging", split_ids.mapped('batch_ids.prs'))
batched_prs = [batch.prs for batch in split_ids.batch_ids]
split_ids.unlink()
elif rows:
# p=1 or p=2
batched_prs = [PRs.browse(pr_ids) for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)]
else:
continue
staged = Batch
meta = {repo: {} for repo in project.repo_ids}
for repo, it in meta.items():
gh = it['gh'] = repo.github()
it['head'] = gh.head(branch.name)
# create tmp staging branch
gh.set_ref('tmp.{}'.format(branch.name), it['head'])
batch_limit = project.batch_limit
for batch in batched_prs:
if len(staged) >= batch_limit:
break
staged |= Batch.stage(meta, batch)
if staged:
heads = {}
for repo, it in meta.items():
tree = it['gh'].commit(it['head'])['tree']
# ensures staging branches are unique and always
# rebuilt
r = base64.b64encode(os.urandom(12)).decode('ascii')
dummy_head = it['gh']('post', 'git/commits', json={
'message': 'force rebuild\n\nuniquifier: %s' % r,
'tree': tree['sha'],
'parents': [it['head']],
}).json()
# $repo is the head to check, $repo^ is the head to merge
heads[repo.name + '^'] = it['head']
heads[repo.name] = dummy_head['sha']
# create actual staging object
st = self.env['runbot_merge.stagings'].create({
'target': branch.id,
'batch_ids': [(4, batch.id, 0) for batch in staged],
'heads': json.dumps(heads)
})
# create staging branch from tmp
for r in project.repo_ids:
it = meta[r]
_logger.info(
"%s: create staging for %s:%s at %s",
project.name, r.name, branch.name,
heads[r.name]
)
it['gh'].set_ref('staging.{}'.format(branch.name), heads[r.name])
time.sleep(STAGING_SLEEP)
# creating the staging doesn't trigger a write on the prs
# and thus the ->staging taggings, so do that by hand
Tagging = self.env['runbot_merge.pull_requests.tagging']
for pr in st.mapped('batch_ids.prs'):
Tagging.create({
'pull_request': pr.number,
'repository': pr.repository.id,
'state_from': pr._tagstate,
'state_to': 'staged',
})
logger.info("Created staging %s (%s)", st, staged)
# I have no idea why this is necessary for tests to pass, the only
# DB update done not through the ORM is when receiving a notification
# that a PR has been closed
@ -391,6 +218,123 @@ class Branch(models.Model):
for b in self:
b.active_staging_id = b.staging_ids
def try_staging(self):
""" Tries to create a staging if the current branch does not already
have one. Returns None if the branch already has a staging or there
is nothing to stage, the newly created staging otherwise.
"""
logger = _logger.getChild('cron')
logger.info(
"Checking %s (%s) for staging: %s, skip? %s",
self, self.name,
self.active_staging_id,
bool(self.active_staging_id)
)
if self.active_staging_id:
return
PRs = self.env['runbot_merge.pull_requests']
# noinspection SqlResolve
self.env.cr.execute("""
SELECT
min(pr.priority) as priority,
array_agg(pr.id) AS match
FROM runbot_merge_pull_requests pr
LEFT JOIN runbot_merge_batch batch ON pr.batch_id = batch.id AND batch.active
WHERE pr.target = %s
-- exclude terminal states (so there's no issue when
-- deleting branches & reusing labels)
AND pr.state != 'merged'
AND pr.state != 'closed'
GROUP BY pr.label
HAVING (bool_or(pr.priority = 0) AND NOT bool_or(pr.state = 'error'))
OR bool_and(pr.state = 'ready')
ORDER BY min(pr.priority), min(pr.id)
""", [self.id])
# result: [(priority, [(repo_id, pr_id) for repo in repos]
rows = self.env.cr.fetchall()
priority = rows[0][0] if rows else -1
if priority == 0:
# p=0 take precedence over all else
batched_prs = [PRs.browse(pr_ids) for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)]
elif self.split_ids:
split_ids = self.split_ids[0]
logger.info("Found split of PRs %s, re-staging", split_ids.mapped('batch_ids.prs'))
batched_prs = [batch.prs for batch in split_ids.batch_ids]
split_ids.unlink()
elif rows:
# p=1 or p=2
batched_prs = [PRs.browse(pr_ids) for _, pr_ids in takewhile(lambda r: r[0] == priority, rows)]
else:
return
Batch = self.env['runbot_merge.batch']
staged = Batch
meta = {repo: {} for repo in self.project_id.repo_ids}
for repo, it in meta.items():
gh = it['gh'] = repo.github()
it['head'] = gh.head(self.name)
# create tmp staging branch
gh.set_ref('tmp.{}'.format(self.name), it['head'])
batch_limit = self.project_id.batch_limit
for batch in batched_prs:
if len(staged) >= batch_limit:
break
staged |= Batch.stage(meta, batch)
if not staged:
return
heads = {}
for repo, it in meta.items():
tree = it['gh'].commit(it['head'])['tree']
# ensures staging branches are unique and always
# rebuilt
r = base64.b64encode(os.urandom(12)).decode('ascii')
dummy_head = it['gh']('post', 'git/commits', json={
'message': 'force rebuild\n\nuniquifier: %s' % r,
'tree': tree['sha'],
'parents': [it['head']],
}).json()
# $repo is the head to check, $repo^ is the head to merge
heads[repo.name + '^'] = it['head']
heads[repo.name] = dummy_head['sha']
# create actual staging object
st = self.env['runbot_merge.stagings'].create({
'target': self.id,
'batch_ids': [(4, batch.id, 0) for batch in staged],
'heads': json.dumps(heads)
})
# create staging branch from tmp
for r in self.project_id.repo_ids:
it = meta[r]
_logger.info(
"%s: create staging for %s:%s at %s",
self.project_id.name, r.name, self.name,
heads[r.name]
)
it['gh'].set_ref('staging.{}'.format(self.name), heads[r.name])
time.sleep(STAGING_SLEEP)
# creating the staging doesn't trigger a write on the prs
# and thus the ->staging taggings, so do that by hand
Tagging = self.env['runbot_merge.pull_requests.tagging']
for pr in st.mapped('batch_ids.prs'):
Tagging.create({
'pull_request': pr.number,
'repository': pr.repository.id,
'state_from': pr._tagstate,
'state_to': 'staged',
})
logger.info("Created staging %s (%s)", st, staged)
return st
class PullRequests(models.Model):
_name = 'runbot_merge.pull_requests'
_order = 'number desc'
@ -952,6 +896,83 @@ class Stagings(models.Model):
return False
def check_status(self):
"""
Checks the status of an active staging:
* merges it if successful
* splits it if failed (or timed out) and more than 1 batch
* marks the PRs as failed otherwise
* ignores if pending (or cancelled or ff_failed but those should also
be disabled)
"""
logger = _logger.getChild('cron')
if not self.active:
logger.info("Staging %s is not active, ignoring status check", self)
return
logger.info("Checking active staging %s (state=%s)", self, self.state)
project = self.target.project_id
if self.state == 'success':
gh = {repo.name: repo.github() for repo in project.repo_ids}
repo_name = None
staging_heads = json.loads(self.heads)
try:
# reverting updates doesn't work if the branches are
# protected (because a revert is basically a force
# push), instead use the tmp branch as a dry-run
tmp_target = 'tmp.' + self.target.name
# first force-push the current targets to all tmps
for repo_name in staging_heads.keys():
if repo_name.endswith('^'):
continue
g = gh[repo_name]
g.set_ref(tmp_target, g.head(self.target.name))
# then attempt to FF the tmp to the staging
for repo_name, head in staging_heads.items():
if repo_name.endswith('^'):
continue
gh[repo_name].fast_forward(tmp_target, staging_heads.get(repo_name + '^') or head)
# there is still a race condition here, but it's way
# lower than "the entire staging duration"...
for repo_name, head in staging_heads.items():
if repo_name.endswith('^'):
continue
# if the staging has a $repo^ head, merge that,
# otherwise merge the regular (CI'd) head
gh[repo_name].fast_forward(
self.target.name,
staging_heads.get(repo_name + '^') or head
)
except exceptions.FastForwardError as e:
logger.warning(
"Could not fast-forward successful staging on %s:%s",
repo_name, self.target.name,
exc_info=True
)
self.write({
'state': 'ff_failed',
'reason': str(e.__cause__ or e.__context__ or '')
})
else:
prs = self.mapped('batch_ids.prs')
logger.info(
"%s FF successful, marking %s as merged",
self, prs
)
prs.write({'state': 'merged'})
for pr in prs:
# FIXME: this is the staging head rather than the actual merge commit for the PR
staging_head = staging_heads.get(pr.repository.name + '^') or staging_heads[pr.repository.name]
gh[pr.repository.name].close(pr.number, 'Merged in {}'.format(staging_head))
finally:
self.batch_ids.write({'active': False})
self.write({'active': False})
elif self.state == 'failure' or project.is_timed_out(self):
self.try_splitting()
class Split(models.Model):
_name = 'runbot_merge.split'