runbot/runbot_merge/models/commands.py
2025-02-28 16:12:40 +01:00

397 lines
11 KiB
Python

import enum
from collections.abc import Iterator
from dataclasses import dataclass, field
from typing import List, Optional, Union, Tuple
def tokenize(line: str) -> Iterator[str]:
cur = ''
for c in line:
if c == '-' and not cur:
yield '-'
elif c in ' \t+=,':
if cur:
yield cur
cur = ''
if not c.isspace():
yield c
else:
cur += c
if cur:
yield cur
def normalize(it: Iterator[str]) -> Iterator[str]:
"""Converts shorthand tokens to expanded version
"""
for t in it:
match t:
case 'r':
yield 'review'
case 'rplush':
yield 'review'
yield '+'
case 'r-':
yield 'review'
yield '-'
case _:
yield t
@dataclass
class Peekable(Iterator[str]):
it: Iterator[str]
memo: Optional[str] = None
def __iter__(self) -> Iterator[str]:
return self
def __next__(self) -> str:
if self.memo is not None:
v, self.memo = self.memo, None
return v
return next(self.it)
def peek(self) -> Optional[str]:
if self.memo is None:
self.memo = next(self.it, None)
return self.memo
class CommandError(Exception):
pass
class Approve:
def __init__(self, ids: Optional[List[int]] = None) -> None:
self.ids = ids
def __str__(self) -> str:
if self.ids is not None:
ids = ','.join(map(str, self.ids))
return f"r={ids}"
return 'review+'
def __contains__(self, item):
if self.ids is None:
return True
return item in self.ids
def fmt(self):
return ", ".join(f"#{n:d}" for n in (self.ids or ()))
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "r(eview)+", "approves the PR, if it's a forwardport also approves all non-detached parents"
yield "r(eview)=<number>", "only approves the specified parents"
class Reject:
def __str__(self) -> str:
return 'review-'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "r(eview)-", "removes approval of a previously approved PR, if the PR is staged the staging will be cancelled"
class MergeMethod(enum.Enum):
SQUASH = 'squash'
REBASE_FF = 'rebase-ff'
REBASE_MERGE = 'rebase-merge'
MERGE = 'merge'
def __str__(self) -> str:
return self.value
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield str(cls.MERGE), "integrate the PR with a simple merge commit, using the PR description as message"
yield str(cls.REBASE_MERGE), "rebases the PR on top of the target branch the integrates with a merge commit, using the PR description as message"
yield str(cls.REBASE_FF), "rebases the PR on top of the target branch, then fast-forwards"
yield str(cls.SQUASH), "squashes the PR as a single commit on the target branch, using the PR description as message"
class Retry:
def __str__(self) -> str:
return 'retry'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "retry", 're-tries staging a PR in the "error" state'
class Check:
def __str__(self) -> str:
return 'check'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "check", "fetches or refreshes PR metadata, resets mergebot state"
@dataclass
class Override:
statuses: List[str] = field(default_factory=list)
def __str__(self) -> str:
return f"override={','.join(self.statuses)}"
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "override=<...>", "marks overridable statuses as successful"
@dataclass
class Delegate:
users: List[str] = field(default_factory=list)
def __str__(self) -> str:
if not self.users:
return 'delegate+'
return f"delegate={','.join(self.users)}"
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "delegate+", "grants approval rights to the PR author"
yield "delegate=<...>", "grants approval rights on this PR to the specified github users"
class Priority(enum.Enum):
DEFAULT = enum.auto()
PRIORITY = enum.auto()
ALONE = enum.auto()
def __str__(self) -> str:
return self.name.lower()
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield str(cls.DEFAULT), "stages the PR normally"
yield str(cls.PRIORITY), "tries to stage this PR first, then adds `default` PRs if the staging has room"
yield str(cls.ALONE), "stages this PR only with other PRs of the same priority"
class CancelStaging:
def __str__(self) -> str:
return "cancel=staging"
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "cancel=staging", "automatically cancels the current staging when this PR becomes ready"
class SkipChecks:
def __str__(self) -> str:
return 'skipchecks'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "skipchecks", "bypasses both statuses and review"
class FW(enum.Enum):
DEFAULT = enum.auto()
NO = enum.auto()
SKIPCI = enum.auto()
SKIPMERGE = enum.auto()
def __str__(self) -> str:
return f'fw={self.name.lower()}'
@classmethod
def help(cls, is_reviewer: bool) -> Iterator[Tuple[str, str]]:
yield str(cls.NO), "does not forward-port this PR"
yield str(cls.DEFAULT), "forward-ports this PR normally"
if is_reviewer:
yield str(cls.SKIPCI), "does not wait for a forward-port's statuses to succeed before creating the next one"
yield str(cls.SKIPMERGE), "does not wait for the source to be merged before creating forward ports"
@dataclass
class Limit:
branch: Optional[str]
def __str__(self) -> str:
if self.branch is None:
return 'ignore'
return f'up to {self.branch}'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield "up to <branch>", "only ports this PR forward to the specified branch (included)"
class Close:
def __str__(self) -> str:
return 'close'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield str(cls()), "closes this forward-port"
class Help:
def __str__(self) -> str:
return 'help'
@classmethod
def help(cls, _: bool) -> Iterator[Tuple[str, str]]:
yield str(cls()), "displays this help"
Command = Union[
Approve,
CancelStaging,
Close,
Check,
Delegate,
FW,
Help,
Limit,
MergeMethod,
Override,
Priority,
Reject,
Retry,
SkipChecks,
]
class Parser:
def __init__(self, line: str) -> None:
self.it = Peekable(normalize(tokenize(line)))
def __iter__(self) -> Iterator[Command]:
for token in self.it:
if token.startswith("NOW"):
# any number of ! is allowed
if token.startswith("NOW!"):
yield Priority.ALONE
elif token == "NOW":
yield Priority.PRIORITY
else:
raise CommandError(f"unknown command {token!r}")
yield SkipChecks()
yield CancelStaging()
continue
handler = getattr(type(self), f'parse_{token.replace("-", "_")}', None)
if handler:
yield handler(self)
elif '!' in token:
raise CommandError("no need to scream")
else:
raise CommandError(f"unknown command {token!r}")
def assert_next(self, val: str) -> None:
if (actual := next(self.it, None)) != val:
raise CommandError(f"expected {val!r}, got {actual!r}")
def check_next(self, val: str) -> bool:
if self.it.peek() == val:
self.it.memo = None # consume peeked value
return True
return False
def parse_review(self) -> Union[Approve, Reject]:
t = next(self.it, None)
if t == '+':
return Approve()
if t == '-':
return Reject()
if t == '=':
t = next(self.it, None)
if not (t and t.isdecimal()):
raise CommandError(f"expected PR ID to approve, found {t!r}")
ids = [int(t)]
while self.check_next(','):
id = next(self.it, None)
if id and id.isdecimal():
ids.append(int(id))
else:
raise CommandError(f"expected PR ID to approve, found {id!r}")
return Approve(ids)
raise CommandError(f"unknown review {t!r}")
def parse_squash(self) -> MergeMethod:
return MergeMethod.SQUASH
def parse_rebase_ff(self) -> MergeMethod:
return MergeMethod.REBASE_FF
def parse_rebase_merge(self) -> MergeMethod:
return MergeMethod.REBASE_MERGE
def parse_merge(self) -> MergeMethod:
return MergeMethod.MERGE
def parse_retry(self) -> Retry:
return Retry()
def parse_check(self) -> Check:
return Check()
def parse_override(self) -> Override:
self.assert_next('=')
ci = [next(self.it)]
while self.check_next(','):
ci.append(next(self.it))
return Override(ci)
def parse_delegate(self) -> Delegate:
match next(self.it, None):
case '+':
return Delegate()
case '=':
delegates = [next(self.it).lstrip('#@')]
while self.check_next(','):
delegates.append(next(self.it).lstrip('#@'))
return Delegate(delegates)
case d:
raise CommandError(f"unknown delegation {d!r}")
def parse_default(self) -> Priority:
return Priority.DEFAULT
def parse_priority(self) -> Priority:
return Priority.PRIORITY
def parse_alone(self) -> Priority:
return Priority.ALONE
def parse_cancel(self) -> CancelStaging:
self.assert_next('=')
self.assert_next('staging')
return CancelStaging()
def parse_skipchecks(self) -> SkipChecks:
return SkipChecks()
def parse_fw(self) -> FW:
self.assert_next('=')
f = next(self.it, "")
try:
if f in ('disable', 'disabled'):
return FW.NO
return FW[f.upper()]
except KeyError:
raise CommandError(f"unknown fw configuration {f or None!r}") from None
def parse_ignore(self) -> Limit:
return Limit(None)
def parse_up(self) -> Limit:
self.assert_next('to')
if limit := next(self.it, None):
return Limit(limit)
else:
raise CommandError("please provide a branch to forward-port to")
def parse_close(self) -> Close:
return Close()
def parse_help(self) -> Help:
return Help()