diff --git a/CHANGELOG b/CHANGELOG index 1f268f7..2c3df94 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,5 @@ +0.9.0 + - feat: perform compatible version check during update 0.8.13 - update incompatible versions - fix: ckan_uwsgi parser diff --git a/dcor_control/update.py b/dcor_control/update.py index 79e6ffb..cabb08a 100644 --- a/dcor_control/update.py +++ b/dcor_control/update.py @@ -1,14 +1,59 @@ +import functools import os import pathlib import subprocess as sp import sys import click +# replace this import when dropping support for Python 3.8 +# from importlib import resources as importlib_resources +import importlib_resources + + +def get_max_compatible_version(name, ckan_version=None): + """Largest version number of a Python package compatible with current CKAN + + This is done according to the following schema + - parse ``resources/compatible_versions.txt`` + - compare the latest CKAN version in that file with the current CKAN + - if current CKAN matches or is newer, return None + - if current version is lower (compatibility mode), return the + latest package version listed in `compatible_versions.txt` + """ + # parse ``resources/compatible_versions.txt`` + compatible_versions = parse_compatible_versions() + + # compare the latest CKAN version in that file with the current CKAN + vdict0 = compatible_versions[0] + # If CKAN is not installed, `get_package_version` will return `None`. + ckan_cur = get_package_version("ckan") or ckan_version or vdict0["ckan"] + if version_greater_equal(ckan_cur, vdict0["ckan"]): + # if current CKAN matches or is newer, return None + max_version = None + else: + # if current version is lower (compatibility mode), return the + # latest package version listed in `compatible_versions.txt` + for vdict in compatible_versions: + print(vdict) + if vdict["ckan"] == ckan_cur: + max_version = vdict[name] + break + else: + raise IndexError(f"Could not find current CKAN version {ckan_cur} " + f"in 'compatible_versions.txt'") + return max_version def get_package_version(name): - info = sp.check_output("pip show {}".format(name), shell=True) - version = info.decode("utf-8").split("\n")[1].split()[1] + try: + info = sp.check_output(f"pip show {name}", shell=True).decode("utf-8") + except sp.CalledProcessError: + info = "error" + + if info.count("Version:"): + version = info.split("\n")[1].split()[1] + else: + version = None return version @@ -24,7 +69,32 @@ def package_is_editable(name): return False +@functools.cache +def parse_compatible_versions(): + """Return a list of dicts containing compatible versions + + Data are taken from ``resources/compatible_versions.txt`` + + The returned list preserves the order in the file, entries + at the top are at the beginning of the list. + """ + data = importlib_resources.files("dcor_control.resources").joinpath( + "compatible_versions.txt").read_text() + lines = data.split("\n") + header = lines.pop(0).strip().split() + compatible_versions = [] + for line in lines: + version_dict = {} + versions = line.strip().split() + if versions: # ignore empty lines + for key, val in zip(header, versions): + version_dict[key] = val + compatible_versions.append(version_dict) + return compatible_versions + + def update_package(name): + """Update a DCOR-related Python package""" old_ver = get_package_version(name) for path_item in sys.path: if name in path_item: @@ -32,8 +102,8 @@ def update_package(name): # in editable mode. is_git = (pathlib.Path(path_item) / ".git").exists() if is_git: - click.secho("Attempting to update git repository " - + "at '{}'.".format(path_item), bold=True) + click.secho(f"Attempting to update git repository " + f"at '{path_item}'.", bold=True) wd = os.getcwd() os.chdir(path_item) try: @@ -46,10 +116,53 @@ def update_package(name): os.chdir(wd) break else: - click.secho("Updating package '{}' using pip...".format(name), - bold=True) - sp.check_output("pip install --upgrade {}".format(name), - shell=True) + click.secho(f"Updating package '{name}' using pip...", bold=True) + # Perform a compatible version check + req_version = get_max_compatible_version(name) + if req_version is not None: + pin = f"=={req_version}" + else: + pin = "" + sp.check_output(f"pip install --upgrade {name}{pin}", shell=True) new_ver = get_package_version(name) if old_ver != new_ver: - print("...updated {} from {} to {}.".format(name, old_ver, new_ver)) + print(f"...updated {name} from {old_ver} to {new_ver}.") + + +def version_greater(va: str, vb: str): + """Return True when `va` is greater than `vb`, False otherwise""" + va = va.strip() + vb = vb.strip() + + val = va.split(".") + vbl = vb.split(".") + max_len = max(len(val), len(vbl)) + + for ii in range(max_len): + try: + ai = val[ii] + except IndexError: + ai = "0" + try: + bi = vbl[ii] + except IndexError: + bi = "0" + if ai != bi: + if ai.isdigit() and bi.isdigit(): + # We have integers + return int(ai) > int(bi) + else: + # We have strings, compare them alphabetically + return version_greater( + va=".".join([f"{ord(char)}" for char in ai]), + vb=".".join([f"{ord(char)}" for char in bi]) + ) + # versions match + return False + + +def version_greater_equal(va: str, vb: str): + """Return True when `va` is greater/equal to `vb`, False otherwise""" + va = va.strip() + vb = vb.strip() + return va == vb or version_greater(va, vb) diff --git a/setup.py b/setup.py index 34bd73a..aa56952 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ "ckanext-dcor_schemas>0.17.2", "ckanext-dcor_theme>0.6.1", "dcor_shared>=0.5.2", + "importlib_resources", "numpy>=1.21", # CVE-2021-33430 ], # not to be confused with definitions in pyproject.toml [build-system] diff --git a/tests/test_update.py b/tests/test_update.py new file mode 100644 index 0000000..dbdec80 --- /dev/null +++ b/tests/test_update.py @@ -0,0 +1,94 @@ +import pytest +import uuid + +import numpy as np +from dcor_control import update + + +@pytest.mark.parametrize("name,ckan_version,version", + [ + # latest version always returns None + ("ckanext.dc_log_view", None, None), + ("ckanext.dc_log_view", "2.9.0", "0.2.2"), + ]) +def test_get_max_compatible_version(name, ckan_version, version): + assert update.get_max_compatible_version( + name, ckan_version=ckan_version) == version + + +def test_get_max_compatible_version_invalid(): + with pytest.raises(IndexError, match="Could not find current CKAN versio"): + update.get_max_compatible_version(name="ckanext.dc_log_view", + ckan_version="1.2") + + +@pytest.mark.parametrize("name,version", + [ + ("numpy", np.__version__), + (str(uuid.uuid4()).replace("-", ""), None), + ]) +def test_get_package_version(name, version): + assert update.get_package_version(name) == version + + +def test_parse_compatible_versions(): + data = update.parse_compatible_versions() + # make sure this line exists + # 2.10.1 0.3.2 0.14.0 0.8.1 0.13.7 0.18.9 0.7.6 0.5.5 + dict_exp = { + 'ckan': '2.10.1', + 'ckanext.dc_log_view': '0.3.2', + 'ckanext.dc_serve': '0.14.0', + 'ckanext.dc_view': '0.8.1', + 'ckanext.dcor_depot': '0.13.7', + 'ckanext.dcor_schemas': '0.18.9', + 'ckanext.dcor_theme': '0.7.6', + 'dcor_shared': '0.5.5'} + assert dict_exp in data + + +@pytest.mark.parametrize("va,vb,result", [ + # Truly greater + ("2", "1", True), + ("2.1", "2.0", True), + ("2.10.1", "2.10.0", True), + ("2.10.1.b", "2.10.1.a", True), + ("2.10.1.bommel3", "2.10.1.bommel2", True), + ("2.10.1.bommel3 ", "2.10.1.bommel2", True), + # Equal + ("1", "1", False), + ("1.1", "1.1", False), + ("Peter", "Peter", False), + # Smaller + ("2", "3", False), + ("2.1", "2.2", False), + ("2.10.1", "2.10.2", False), + ("2.10.1.b", "2.10.1.c", False), + ("2.10.1.bommel1", "2.10.1.bommel2", False), +]) +def test_version_greater(va, vb, result): + assert update.version_greater(va, vb) == result + + +@pytest.mark.parametrize("va,vb,result", [ + # Truly greater + ("2", "1", True), + ("2.1", "2.0", True), + ("2.10.1", "2.10.0", True), + ("2.10.1.b", "2.10.1.a", True), + ("2.10.1.bommel3", "2.10.1.bommel2", True), + # Equal + ("1", "1", True), + ("1.1", "1.1", True), + ("Peter", "Peter", True), + ("Peter", "Peter ", True), + # Smaller + ("2", "3", False), + ("2 ", "3 ", False), + ("2.1", "2.2", False), + ("2.10.1", "2.10.2", False), + ("2.10.1.b", "2.10.1.c", False), + ("2.10.1.bommel1", "2.10.1.bommel2", False), +]) +def test_version_greater_equal(va, vb, result): + assert update.version_greater_equal(va, vb) == result