mirror of
https://github.com/odoo/runbot.git
synced 2025-03-15 15:35:46 +07:00

The UX around the split of limit and forward port policy (and
especially moving "don't forward port" to the policy) was not really
considered and caused confusion for both me and devs: after having
disabled forward porting, updating the limit would not restore it, but
there would be no indication of such an issue.
This caused odoo/enterprise#68916 to not be forward ported at merge
(despite looking for all the world like it should be), and while
updating the limit post-merge did force a forward-port that
inconsistency was just as jarring (also not helped by being unable to
create an fw batch via the backend UI because reasons, see
10ca096d86
).
Fix this along most axis:
- Notify the user and reset the fw policy if the limit is updated
while `fw=no`.
- Trigger a forward port if the fw policy is updated (from `no`) on a
merged PR, currently only sources.
- Add check & warning to limit update process so it does *not* force a
port (though maybe it should under the assumption that we're
updating the limit anyway? We'll see).
Fixes #953
395 lines
11 KiB
Python
395 lines
11 KiB
Python
import enum
|
|
from collections.abc import Iterator
|
|
from dataclasses import dataclass, field
|
|
from functools import partial
|
|
from operator import contains
|
|
from typing import Callable, List, Optional, Union, Tuple
|
|
|
|
|
|
def tokenize(line: str) -> Iterator[str]:
|
|
cur = ''
|
|
for c in line:
|
|
if c == '-' and not cur:
|
|
yield '-'
|
|
elif c in ' \t+=,':
|
|
if cur:
|
|
yield cur
|
|
cur = ''
|
|
if not c.isspace():
|
|
yield c
|
|
else:
|
|
cur += c
|
|
|
|
if cur:
|
|
yield cur
|
|
|
|
|
|
def normalize(it: Iterator[str]) -> Iterator[str]:
|
|
"""Converts shorthand tokens to expanded version
|
|
"""
|
|
for t in it:
|
|
match t:
|
|
case 'r':
|
|
yield 'review'
|
|
case 'r-':
|
|
yield 'review'
|
|
yield '-'
|
|
case _:
|
|
yield t
|
|
|
|
|
|
@dataclass
|
|
class Peekable(Iterator[str]):
|
|
it: Iterator[str]
|
|
memo: Optional[str] = None
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
return self
|
|
|
|
def __next__(self) -> str:
|
|
if self.memo is not None:
|
|
v, self.memo = self.memo, None
|
|
return v
|
|
return next(self.it)
|
|
|
|
def peek(self) -> Optional[str]:
|
|
if self.memo is None:
|
|
self.memo = next(self.it, None)
|
|
return self.memo
|
|
|
|
|
|
class CommandError(Exception):
|
|
pass
|
|
|
|
|
|
class Approve:
|
|
def __init__(self, ids: Optional[List[int]] = None) -> None:
|
|
self.ids = ids
|
|
|
|
def __str__(self) -> str:
|
|
if self.ids is not None:
|
|
ids = ','.join(map(str, self.ids))
|
|
return f"r={ids}"
|
|
return 'review+'
|
|
|
|
def __contains__(self, item):
|
|
if self.ids is None:
|
|
return True
|
|
return item in self.ids
|
|
|
|
def fmt(self):
|
|
return ", ".join(f"#{n:d}" for n in (self.ids or ()))
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "r(eview)+", "approves the PR, if it's a forwardport also approves all non-detached parents"
|
|
yield "r(eview)=<number>", "only approves the specified parents"
|
|
|
|
class Reject:
|
|
def __str__(self) -> str:
|
|
return 'review-'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "r(eview)-", "removes approval of a previously approved PR, if the PR is staged the staging will be cancelled"
|
|
|
|
class MergeMethod(enum.Enum):
|
|
SQUASH = 'squash'
|
|
REBASE_FF = 'rebase-ff'
|
|
REBASE_MERGE = 'rebase-merge'
|
|
MERGE = 'merge'
|
|
|
|
def __str__(self) -> str:
|
|
return self.value
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield str(cls.MERGE), "integrate the PR with a simple merge commit, using the PR description as message"
|
|
yield str(cls.REBASE_MERGE), "rebases the PR on top of the target branch the integrates with a merge commit, using the PR description as message"
|
|
yield str(cls.REBASE_FF), "rebases the PR on top of the target branch, then fast-forwards"
|
|
yield str(cls.SQUASH), "squashes the PR as a single commit on the target branch, using the PR description as message"
|
|
|
|
|
|
class Retry:
|
|
def __str__(self) -> str:
|
|
return 'retry'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "retry", 're-tries staging a PR in the "error" state'
|
|
|
|
|
|
class Check:
|
|
def __str__(self) -> str:
|
|
return 'check'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "check", "fetches or refreshes PR metadata, resets mergebot state"
|
|
|
|
|
|
@dataclass
|
|
class Override:
|
|
statuses: List[str] = field(default_factory=list)
|
|
|
|
def __str__(self) -> str:
|
|
return f"override={','.join(self.statuses)}"
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "override=<...>", "marks overridable statuses as successful"
|
|
|
|
|
|
@dataclass
|
|
class Delegate:
|
|
users: List[str] = field(default_factory=list)
|
|
|
|
def __str__(self) -> str:
|
|
if not self.users:
|
|
return 'delegate+'
|
|
return f"delegate={','.join(self.users)}"
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "delegate+", "grants approval rights to the PR author"
|
|
yield "delegate=<...>", "grants approval rights on this PR to the specified github users"
|
|
|
|
|
|
class Priority(enum.Enum):
|
|
DEFAULT = enum.auto()
|
|
PRIORITY = enum.auto()
|
|
ALONE = enum.auto()
|
|
|
|
def __str__(self) -> str:
|
|
return self.name.lower()
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield str(cls.DEFAULT), "stages the PR normally"
|
|
yield str(cls.PRIORITY), "tries to stage this PR first, then adds `default` PRs if the staging has room"
|
|
yield str(cls.ALONE), "stages this PR only with other PRs of the same priority"
|
|
|
|
|
|
class CancelStaging:
|
|
def __str__(self) -> str:
|
|
return "cancel=staging"
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "cancel=staging", "automatically cancels the current staging when this PR becomes ready"
|
|
|
|
|
|
class SkipChecks:
|
|
def __str__(self) -> str:
|
|
return 'skipchecks'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "skipchecks", "bypasses both statuses and review"
|
|
|
|
|
|
class FW(enum.Enum):
|
|
DEFAULT = enum.auto()
|
|
NO = enum.auto()
|
|
SKIPCI = enum.auto()
|
|
# SKIPMERGE = enum.auto()
|
|
|
|
def __str__(self) -> str:
|
|
return f'fw={self.name.lower()}'
|
|
|
|
@classmethod
|
|
def help(cls, is_reviewer: bool) -> Iterator[Tuple[str, str]]:
|
|
yield str(cls.NO), "does not forward-port this PR"
|
|
yield str(cls.DEFAULT), "forward-ports this PR normally"
|
|
if is_reviewer:
|
|
yield str(cls.SKIPCI), "does not wait for a forward-port's statuses to succeed before creating the next one"
|
|
|
|
|
|
@dataclass
|
|
class Limit:
|
|
branch: Optional[str]
|
|
|
|
def __str__(self) -> str:
|
|
if self.branch is None:
|
|
return 'ignore'
|
|
return f'up to {self.branch}'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield "up to <branch>", "only ports this PR forward to the specified branch (included)"
|
|
|
|
|
|
class Close:
|
|
def __str__(self) -> str:
|
|
return 'close'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield str(cls()), "closes this forward-port"
|
|
|
|
|
|
class Help:
|
|
def __str__(self) -> str:
|
|
return 'help'
|
|
|
|
@classmethod
|
|
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
|
|
yield str(cls()), "displays this help"
|
|
|
|
|
|
Command = Union[
|
|
Approve,
|
|
CancelStaging,
|
|
Close,
|
|
Check,
|
|
Delegate,
|
|
FW,
|
|
Help,
|
|
Limit,
|
|
MergeMethod,
|
|
Override,
|
|
Priority,
|
|
Reject,
|
|
Retry,
|
|
SkipChecks,
|
|
]
|
|
|
|
|
|
class Parser:
|
|
def __init__(self, line: str) -> None:
|
|
self.it = Peekable(normalize(tokenize(line)))
|
|
|
|
def __iter__(self) -> Iterator[Command]:
|
|
for token in self.it:
|
|
if token.startswith("NOW"):
|
|
# any number of ! is allowed
|
|
if token.startswith("NOW!"):
|
|
yield Priority.ALONE
|
|
elif token == "NOW":
|
|
yield Priority.PRIORITY
|
|
else:
|
|
raise CommandError(f"unknown command {token!r}")
|
|
yield SkipChecks()
|
|
yield CancelStaging()
|
|
continue
|
|
|
|
handler = getattr(type(self), f'parse_{token.replace("-", "_")}', None)
|
|
if handler:
|
|
yield handler(self)
|
|
elif '!' in token:
|
|
raise CommandError("no need to scream")
|
|
else:
|
|
raise CommandError(f"unknown command {token!r}")
|
|
|
|
def assert_next(self, val: str) -> None:
|
|
if (actual := next(self.it, None)) != val:
|
|
raise CommandError(f"expected {val!r}, got {actual!r}")
|
|
|
|
def check_next(self, val: str) -> bool:
|
|
if self.it.peek() == val:
|
|
self.it.memo = None # consume peeked value
|
|
return True
|
|
return False
|
|
|
|
def parse_review(self) -> Union[Approve, Reject]:
|
|
t = next(self.it, None)
|
|
if t == '+':
|
|
return Approve()
|
|
if t == '-':
|
|
return Reject()
|
|
if t == '=':
|
|
t = next(self.it, None)
|
|
if not (t and t.isdecimal()):
|
|
raise CommandError(f"expected PR ID to approve, found {t!r}")
|
|
|
|
ids = [int(t)]
|
|
while self.check_next(','):
|
|
id = next(self.it, None)
|
|
if id and id.isdecimal():
|
|
ids.append(int(id))
|
|
else:
|
|
raise CommandError(f"expected PR ID to approve, found {id!r}")
|
|
return Approve(ids)
|
|
|
|
raise CommandError(f"unknown review {t!r}")
|
|
|
|
def parse_squash(self) -> MergeMethod:
|
|
return MergeMethod.SQUASH
|
|
|
|
def parse_rebase_ff(self) -> MergeMethod:
|
|
return MergeMethod.REBASE_FF
|
|
|
|
def parse_rebase_merge(self) -> MergeMethod:
|
|
return MergeMethod.REBASE_MERGE
|
|
|
|
def parse_merge(self) -> MergeMethod:
|
|
return MergeMethod.MERGE
|
|
|
|
def parse_retry(self) -> Retry:
|
|
return Retry()
|
|
|
|
def parse_check(self) -> Check:
|
|
return Check()
|
|
|
|
def parse_override(self) -> Override:
|
|
self.assert_next('=')
|
|
ci = [next(self.it)]
|
|
while self.check_next(','):
|
|
ci.append(next(self.it))
|
|
return Override(ci)
|
|
|
|
def parse_delegate(self) -> Delegate:
|
|
match next(self.it, None):
|
|
case '+':
|
|
return Delegate()
|
|
case '=':
|
|
delegates = [next(self.it).lstrip('#@')]
|
|
while self.check_next(','):
|
|
delegates.append(next(self.it).lstrip('#@'))
|
|
return Delegate(delegates)
|
|
case d:
|
|
raise CommandError(f"unknown delegation {d!r}")
|
|
|
|
def parse_default(self) -> Priority:
|
|
return Priority.DEFAULT
|
|
|
|
def parse_priority(self) -> Priority:
|
|
return Priority.PRIORITY
|
|
|
|
def parse_alone(self) -> Priority:
|
|
return Priority.ALONE
|
|
|
|
def parse_cancel(self) -> CancelStaging:
|
|
self.assert_next('=')
|
|
self.assert_next('staging')
|
|
return CancelStaging()
|
|
|
|
def parse_skipchecks(self) -> SkipChecks:
|
|
return SkipChecks()
|
|
|
|
def parse_fw(self) -> FW:
|
|
self.assert_next('=')
|
|
f = next(self.it, "")
|
|
try:
|
|
if f in ('disable', 'disabled'):
|
|
return FW.NO
|
|
return FW[f.upper()]
|
|
except KeyError:
|
|
raise CommandError(f"unknown fw configuration {f or None!r}") from None
|
|
|
|
def parse_ignore(self) -> Limit:
|
|
return Limit(None)
|
|
|
|
def parse_up(self) -> Limit:
|
|
self.assert_next('to')
|
|
if limit := next(self.it, None):
|
|
return Limit(limit)
|
|
else:
|
|
raise CommandError("please provide a branch to forward-port to")
|
|
|
|
def parse_close(self) -> Close:
|
|
return Close()
|
|
|
|
def parse_help(self) -> Help:
|
|
return Help()
|