517 lines
17 KiB
Python
517 lines
17 KiB
Python
|
#!/usr/bin/env python
|
||
|
|
||
|
"""
|
||
|
Checks versions from the requirements files against distribution-provided
|
||
|
versions, taking distribution's Python version in account e.g. if checking
|
||
|
against a release which bundles Python 3.5, checks the 3.5 version of
|
||
|
requirements.
|
||
|
|
||
|
* only shows requirements for which at least one release diverges from the
|
||
|
matching requirements version
|
||
|
* empty or green cells mean that specific release matches its requirement (happens when
|
||
|
checking multiple releases: one of the other releases may mismatch the its
|
||
|
requirements necessating showing the row)
|
||
|
|
||
|
This script was heavily reworked but is not in a final version:
|
||
|
TODO:
|
||
|
- add legends
|
||
|
- better management of cache
|
||
|
- add meta info on cells (mainly to genearte a better html report)
|
||
|
- warn/ko reason
|
||
|
- wheel + link
|
||
|
- original debian package name + link
|
||
|
...
|
||
|
|
||
|
"""
|
||
|
|
||
|
import argparse
|
||
|
import gzip
|
||
|
import itertools
|
||
|
import json
|
||
|
import os
|
||
|
import re
|
||
|
import shutil
|
||
|
import tempfile
|
||
|
|
||
|
try:
|
||
|
import ansitoimg
|
||
|
except ImportError:
|
||
|
ansitoimg = None
|
||
|
|
||
|
from abc import ABC, abstractmethod
|
||
|
from pathlib import Path
|
||
|
from sys import stderr, stdout
|
||
|
from typing import Dict, List, Optional, Tuple
|
||
|
from urllib.request import HTTPError
|
||
|
from urllib.request import urlopen as _urlopen
|
||
|
|
||
|
from packaging.markers import Marker
|
||
|
from packaging.requirements import Requirement
|
||
|
from packaging.tags import mac_platforms # noqa: PLC2701
|
||
|
from packaging.utils import canonicalize_name
|
||
|
|
||
|
from pip._internal.index.package_finder import (
|
||
|
LinkEvaluator, # noqa: PLC2701
|
||
|
)
|
||
|
from pip._internal.models.link import Link # noqa: PLC2701
|
||
|
from pip._internal.models.target_python import TargetPython # noqa: PLC2701
|
||
|
|
||
|
Version = Tuple[int, ...]
|
||
|
|
||
|
# shared beween debian and ubuntu
|
||
|
SPECIAL = {
|
||
|
'pytz': 'tz',
|
||
|
'libsass': 'libsass-python',
|
||
|
}
|
||
|
SUPPORTED_FORMATS = ('txt', 'ansi', 'svg', 'html', 'json')
|
||
|
PLATFORM_CODES = ('linux', 'win32', 'darwin')
|
||
|
PLATFORM_NAMES = ('Linux', 'Win', 'OSX')
|
||
|
|
||
|
|
||
|
def urlopen(url):
|
||
|
file_name = "".join(c if c.isalnum() else '_' for c in url)
|
||
|
os.makedirs('/tmp/package_versions_cache/', exist_ok=True)
|
||
|
file_path = f'/tmp/package_versions_cache/{file_name}'
|
||
|
if not os.path.isfile(file_path):
|
||
|
response = _urlopen(url)
|
||
|
with open(file_path, 'wb') as fw:
|
||
|
fw.write(response.read())
|
||
|
return open(file_path, 'rb') # noqa: SIM115
|
||
|
|
||
|
|
||
|
def parse_version(vstring: str) -> Optional[Version]:
|
||
|
if not vstring:
|
||
|
return None
|
||
|
return tuple(map(int, vstring.split('.')))
|
||
|
|
||
|
|
||
|
def cleanup_debian_version(s: str) -> str:
|
||
|
""" Try to strip the garbage from the version string, just remove everything
|
||
|
following the first `+`, `~` or `-`
|
||
|
"""
|
||
|
return re.match(r'''
|
||
|
(?:\d+:)? # debian crud prefix
|
||
|
(.*?) # the shit we actually want
|
||
|
(?:~|\+|-|\.dfsg)
|
||
|
.*
|
||
|
''', s, flags=re.VERBOSE)[1]
|
||
|
|
||
|
|
||
|
class PipPackage:
|
||
|
def __init__(self, name):
|
||
|
self.name = name
|
||
|
infos = json.load(urlopen(f'https://pypi.org/pypi/{name}/json'))
|
||
|
self.info = infos['info']
|
||
|
self.last_serial = infos['last_serial']
|
||
|
self.releases = infos['releases']
|
||
|
self.urls = infos['urls']
|
||
|
self.vulnerabilities = infos['vulnerabilities']
|
||
|
|
||
|
def has_wheel_for(self, version, python_version, platform):
|
||
|
if version is None:
|
||
|
return (False, False, False)
|
||
|
py_version_info = python_version.split('.')
|
||
|
if len(py_version_info) == 2:
|
||
|
py_version_info = (py_version_info[0], py_version_info[1], 0)
|
||
|
releases = self.releases
|
||
|
has_wheel_for_version = False
|
||
|
has_any_wheel = False
|
||
|
has_wheel_in_another_version = False
|
||
|
platforms = None
|
||
|
if platform == 'darwin':
|
||
|
platforms = list(mac_platforms((15, 0), 'x86_64'))
|
||
|
elif platform == 'win32':
|
||
|
platforms = ['win32', 'win-amd64']
|
||
|
else:
|
||
|
assert platform == 'linux'
|
||
|
|
||
|
target_python = TargetPython(
|
||
|
platforms=platforms,
|
||
|
py_version_info=py_version_info,
|
||
|
abis=None,
|
||
|
implementation=None,
|
||
|
)
|
||
|
le = LinkEvaluator(
|
||
|
project_name=self.name,
|
||
|
canonical_name=canonicalize_name(self.name),
|
||
|
formats={"binary", "source"},
|
||
|
target_python=target_python,
|
||
|
allow_yanked=True,
|
||
|
ignore_requires_python=False,
|
||
|
)
|
||
|
for release in releases[version]:
|
||
|
if release['filename'].endswith('.whl'):
|
||
|
has_any_wheel = True
|
||
|
is_candidate, _result = le.evaluate_link(Link(
|
||
|
comes_from=None,
|
||
|
url=release['url'],
|
||
|
requires_python=release['requires_python'],
|
||
|
yanked_reason=release['yanked_reason'],
|
||
|
))
|
||
|
if is_candidate:
|
||
|
if release['filename'].endswith('.whl'):
|
||
|
has_wheel_for_version = has_wheel_in_another_version = True
|
||
|
break
|
||
|
|
||
|
if not has_wheel_for_version and has_any_wheel:
|
||
|
# TODO, we should prefer a version matching the one from a distro
|
||
|
for rel_version, rel in releases.items():
|
||
|
for release in rel:
|
||
|
if not release['filename'].endswith('.whl'):
|
||
|
continue
|
||
|
if any(not s.isdigit() for s in rel_version.split('.')) or parse_version(rel_version) <= parse_version(version):
|
||
|
continue
|
||
|
is_candidate, _result = le.evaluate_link(Link(
|
||
|
comes_from=None,
|
||
|
url=release['url'],
|
||
|
requires_python=release['requires_python'],
|
||
|
yanked_reason=release['yanked_reason'],
|
||
|
))
|
||
|
if is_candidate:
|
||
|
has_wheel_in_another_version = True
|
||
|
stderr.write(f'WARNING: Wheel found for {self.name} ({python_version} {platform}) in {rel_version}\n')
|
||
|
return (has_wheel_for_version, has_any_wheel, has_wheel_in_another_version)
|
||
|
|
||
|
return (has_wheel_for_version, has_any_wheel, has_wheel_in_another_version)
|
||
|
|
||
|
|
||
|
class Distribution(ABC):
|
||
|
def __init__(self, release):
|
||
|
self._release = release
|
||
|
|
||
|
@abstractmethod
|
||
|
def get_version(self, package: str) -> Optional[Version]:
|
||
|
...
|
||
|
|
||
|
def __str__(self):
|
||
|
return f'{type(self).__name__.lower()} {self._release}'
|
||
|
|
||
|
@classmethod
|
||
|
def get(cls, name):
|
||
|
try:
|
||
|
return next(
|
||
|
c
|
||
|
for c in cls.__subclasses__()
|
||
|
if c.__name__.lower() == name
|
||
|
)
|
||
|
except StopIteration:
|
||
|
msg = f"Unknown distribution {name!r}"
|
||
|
raise ValueError(msg)
|
||
|
|
||
|
|
||
|
class Debian(Distribution):
|
||
|
def get_version(self, package):
|
||
|
""" Try to find which version of ``package`` is in Debian release {release}
|
||
|
"""
|
||
|
package = SPECIAL.get(package, package)
|
||
|
# try the python prefix first: some packages have a native of foreign $X and
|
||
|
# either the bindings or a python equivalent at python-X, or just a name
|
||
|
# collision
|
||
|
prefixes = ['python-', '']
|
||
|
if package.startswith('python'):
|
||
|
prefixes = ['']
|
||
|
for prefix in prefixes:
|
||
|
try:
|
||
|
res = json.load(urlopen(f'https://sources.debian.org/api/src/{prefix}{package}/'))
|
||
|
except HTTPError:
|
||
|
return 'failed'
|
||
|
if res.get('error') is None:
|
||
|
break
|
||
|
if res.get('error'):
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
return next(
|
||
|
parse_version(cleanup_debian_version(distr['version']))
|
||
|
for distr in res['versions']
|
||
|
if distr['area'] == 'main'
|
||
|
if self._release.lower() in distr['suites']
|
||
|
)
|
||
|
except StopIteration:
|
||
|
return
|
||
|
|
||
|
|
||
|
class Ubuntu(Distribution):
|
||
|
""" Ubuntu doesn't have an API, instead it has a huge text file
|
||
|
"""
|
||
|
def __init__(self, release):
|
||
|
super().__init__(release)
|
||
|
|
||
|
self._packages = {}
|
||
|
# ideally we should request the proper Content-Encoding but PUC
|
||
|
# apparently does not care, and returns a somewhat funky
|
||
|
# content-encoding (x-gzip) anyway
|
||
|
data = gzip.open(
|
||
|
urlopen(f'https://packages.ubuntu.com/source/{release}/allpackages?format=txt.gz'),
|
||
|
mode='rt', encoding='utf-8',
|
||
|
)
|
||
|
for line in itertools.islice(data, 6, None): # first 6 lines is garbage header
|
||
|
# ignore the restricted, security, universe, multiverse tags
|
||
|
m = re.match(r'(\S+) \(([^)]+)\)', line.strip())
|
||
|
assert m, f"invalid line {line.strip()!r}"
|
||
|
self._packages[m[1]] = m[2]
|
||
|
|
||
|
def get_version(self, package):
|
||
|
package = SPECIAL.get(package, package)
|
||
|
for prefix in ['python3-', 'python-', '']:
|
||
|
v = self._packages.get(f'{prefix}{package}')
|
||
|
if v:
|
||
|
return parse_version(cleanup_debian_version(v))
|
||
|
return None
|
||
|
|
||
|
|
||
|
def _strip_comment(line):
|
||
|
return line.split('#', 1)[0].strip()
|
||
|
|
||
|
|
||
|
def parse_requirements(reqpath: Path) -> Dict[str, List[Tuple[str, Marker]]]:
|
||
|
""" Parses a requirement file to a dict of {package: [(version, markers)]}
|
||
|
|
||
|
The env markers express *whether* that specific dep applies.
|
||
|
"""
|
||
|
reqs = {}
|
||
|
with reqpath.open('r', encoding='utf-8') as f:
|
||
|
for req_line in f:
|
||
|
req_line = _strip_comment(req_line)
|
||
|
if not req_line:
|
||
|
continue
|
||
|
requirement = Requirement(req_line)
|
||
|
version = None
|
||
|
if requirement.specifier:
|
||
|
if len(requirement.specifier) > 1:
|
||
|
raise NotImplementedError('multi spec not supported yet')
|
||
|
version = next(iter(requirement.specifier)).version
|
||
|
reqs.setdefault(requirement.name, []).append((version, requirement.marker))
|
||
|
return reqs
|
||
|
|
||
|
|
||
|
def ok(text):
|
||
|
return f'\033[92m{text}\033[39m'
|
||
|
|
||
|
|
||
|
def em(text):
|
||
|
return f'\033[94m{text}\033[39m'
|
||
|
|
||
|
|
||
|
def warn(text):
|
||
|
return f'\033[93m{text}\033[39m'
|
||
|
|
||
|
|
||
|
def ko(text):
|
||
|
return f'\033[91m{text}\033[39m'
|
||
|
|
||
|
|
||
|
def default(text):
|
||
|
return text
|
||
|
|
||
|
|
||
|
def main(args):
|
||
|
checkers = [
|
||
|
Distribution.get(distro)(release)
|
||
|
for version in args.release
|
||
|
for (distro, release) in [version.split(':')]
|
||
|
]
|
||
|
|
||
|
stderr.write("Fetch Python versions...\n")
|
||
|
pyvers = [
|
||
|
'.'.join(map(str, checker.get_version('python3-defaults')[:2]))
|
||
|
for checker in checkers
|
||
|
]
|
||
|
|
||
|
uniq = sorted(set(pyvers), key=parse_version)
|
||
|
platforms = PLATFORM_NAMES if args.check_pypi else PLATFORM_NAMES[:1]
|
||
|
platform_codes = PLATFORM_CODES if args.check_pypi else PLATFORM_CODES[:1]
|
||
|
platform_headers = ['']
|
||
|
python_headers = ['']
|
||
|
table = [platform_headers, python_headers]
|
||
|
# requirements headers
|
||
|
for v in uniq:
|
||
|
for p in platforms:
|
||
|
platform_headers.append(p)
|
||
|
python_headers.append(v)
|
||
|
|
||
|
# distro headers
|
||
|
for checker, version in zip(checkers, pyvers):
|
||
|
platform_headers.append(checker._release[:5])
|
||
|
python_headers.append(version)
|
||
|
|
||
|
reqs = parse_requirements((Path.cwd() / __file__).parent.parent / 'requirements.txt')
|
||
|
if args.filter:
|
||
|
reqs = {r: o for r, o in reqs.items() if any(f in r for f in args.filter.split(','))}
|
||
|
|
||
|
for req, options in reqs.items():
|
||
|
if args.check_pypi:
|
||
|
pip_infos = PipPackage(req)
|
||
|
row = [req]
|
||
|
seps = [' || ']
|
||
|
byver = {}
|
||
|
for pyver in uniq:
|
||
|
# FIXME: when multiple options apply, check which pip uses
|
||
|
# (first-matching. best-matching, latest, ...)
|
||
|
seps[-1] = ' || '
|
||
|
for platform in platform_codes:
|
||
|
platform_version = 'none'
|
||
|
for version, markers in options:
|
||
|
if not markers or markers.evaluate({
|
||
|
'python_version': pyver,
|
||
|
'sys_platform': platform,
|
||
|
}):
|
||
|
if platform == 'linux':
|
||
|
byver[pyver] = version
|
||
|
platform_version = version
|
||
|
break
|
||
|
deco = None
|
||
|
if args.check_pypi:
|
||
|
if platform_version == 'none':
|
||
|
deco = 'ok'
|
||
|
else:
|
||
|
has_wheel_for_version, has_any_wheel, has_wheel_in_another_version = pip_infos.has_wheel_for(platform_version, pyver, platform)
|
||
|
if has_wheel_for_version:
|
||
|
deco = 'ok'
|
||
|
elif has_wheel_in_another_version:
|
||
|
deco = 'ko'
|
||
|
elif has_any_wheel:
|
||
|
deco = 'warn'
|
||
|
if deco in ("ok", None):
|
||
|
if byver.get(pyver, 'none') != platform_version:
|
||
|
deco = 'em'
|
||
|
req_ver = platform_version or 'any'
|
||
|
row.append((req_ver, deco))
|
||
|
seps.append(' | ')
|
||
|
|
||
|
seps[-1] = ' |#| '
|
||
|
# this requirement doesn't apply, ignore
|
||
|
if not byver and not args.all:
|
||
|
continue
|
||
|
|
||
|
for i, c in enumerate(checkers):
|
||
|
req_version = byver.get(pyvers[i], 'none') or 'any'
|
||
|
check_version = '.'.join(map(str, c.get_version(req.lower()) or [])) or None
|
||
|
if req_version != check_version:
|
||
|
deco = 'ko'
|
||
|
if req_version == 'none':
|
||
|
deco = 'ok'
|
||
|
elif req_version == 'any':
|
||
|
if check_version is None:
|
||
|
deco = 'ok'
|
||
|
elif check_version is None:
|
||
|
deco = 'ko'
|
||
|
elif parse_version(req_version) >= parse_version(check_version):
|
||
|
deco = 'warn'
|
||
|
row.append((check_version or '</>', deco))
|
||
|
elif args.all:
|
||
|
row.append((check_version or '</>', 'ok'))
|
||
|
else:
|
||
|
row.append('')
|
||
|
seps.append(' |#| ')
|
||
|
table.append(row)
|
||
|
|
||
|
seps[-1] = ' ' # remove last column separator
|
||
|
|
||
|
stderr.write('\n')
|
||
|
|
||
|
# evaluate width of columns
|
||
|
sizes = [0] * len(table[0])
|
||
|
for row in table:
|
||
|
sizes = [
|
||
|
max(s, len(cell[0] if isinstance(cell, tuple) else cell))
|
||
|
for s, cell in zip(sizes, row)
|
||
|
]
|
||
|
|
||
|
output_format = 'ansi'
|
||
|
if args.format:
|
||
|
output_format = args.format
|
||
|
assert format in SUPPORTED_FORMATS
|
||
|
elif args.output:
|
||
|
output_format = 'txt'
|
||
|
ext = args.output.split('.')[-1]
|
||
|
if ext in SUPPORTED_FORMATS:
|
||
|
output_format = ext
|
||
|
|
||
|
if output_format == 'json':
|
||
|
output = json.dumps(table)
|
||
|
else:
|
||
|
output = ''
|
||
|
# format table
|
||
|
for row in table:
|
||
|
output += ' '
|
||
|
for cell, width, sep in zip(row, sizes, seps):
|
||
|
cell_content = cell
|
||
|
deco = default
|
||
|
if isinstance(cell, tuple):
|
||
|
cell_content, level = cell
|
||
|
if output_format == 'txt' or level is None:
|
||
|
deco = default
|
||
|
elif level == 'ok':
|
||
|
deco = ok
|
||
|
elif level == 'em':
|
||
|
deco = em
|
||
|
elif level == 'warn':
|
||
|
deco = warn
|
||
|
else:
|
||
|
deco = ko
|
||
|
output += deco(f'{cell_content:<{width}}') + sep
|
||
|
output += '\n'
|
||
|
|
||
|
if output_format in ('svg', 'html'):
|
||
|
if not ansitoimg:
|
||
|
output_format = 'ansi'
|
||
|
stderr.write(f'Missing ansitoimg for {output_format} format, switching to ansi')
|
||
|
else:
|
||
|
convert = ansitoimg.ansiToSVG
|
||
|
if output_format == 'html':
|
||
|
convert = ansitoimg.ansiToHTML
|
||
|
with tempfile.NamedTemporaryFile() as tmp:
|
||
|
convert(output, tmp.name, width=(sum(sizes) + sum(len(sep) for sep in seps)), title='requirements-check.py')
|
||
|
output = tmp.read().decode()
|
||
|
# remove mac like bullets
|
||
|
output = output.replace('''<g transform="translate(26,22)">
|
||
|
<circle cx="0" cy="0" r="7" fill="#ff5f57"/>
|
||
|
<circle cx="22" cy="0" r="7" fill="#febc2e"/>
|
||
|
<circle cx="44" cy="0" r="7" fill="#28c840"/>
|
||
|
</g>''', "") #
|
||
|
|
||
|
if args.output:
|
||
|
with open(args.output, 'w', encoding='utf8') as f:
|
||
|
f.write(output)
|
||
|
else:
|
||
|
stdout.write(output)
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
parser = argparse.ArgumentParser(
|
||
|
description=__doc__,
|
||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'release', nargs='+',
|
||
|
help="Release to check against, should use the format '{distro}:{release}' e.g. 'debian:sid'"
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'-a', '--all', action="store_true",
|
||
|
help="Display all requirements even if it matches",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'-o', '--output', help="output path",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'-f', '--format', help=f"Supported format: {', '.join(SUPPORTED_FORMATS)}",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'--update-cache', action="store_true",
|
||
|
help="Ignore the existing package version cache and update them",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'--check-pypi', action="store_true",
|
||
|
help="Check wheel packages",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
'--filter',
|
||
|
help="Comma sepaated list of package to check",
|
||
|
)
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
if args.update_cache:
|
||
|
shutil.rmtree('/tmp/package_versions_cache/')
|
||
|
main(args)
|