From 938466aa47772913624833084d812cca53758e22 Mon Sep 17 00:00:00 2001 From: 434b <17012133+0xricksanchez@users.noreply.github.com> Date: Sun, 23 Oct 2022 12:51:22 +0200 Subject: [PATCH] Added tests for kernel_builder (#117) * More groundwork for #39 --- src/kernel_builder.py | 7 +- src/tests/confs/cfg_setter.ini | 8 ++ src/tests/confs/user.ini | 9 ++ src/tests/test_kernel_builder.py | 213 +++++++++++++++++++++++++++++-- src/tests/test_misc.py | 126 +++++++++++++++++- 5 files changed, 348 insertions(+), 15 deletions(-) create mode 100644 src/tests/confs/cfg_setter.ini create mode 100644 src/tests/confs/user.ini diff --git a/src/kernel_builder.py b/src/kernel_builder.py index fcbfff3..e9d303c 100644 --- a/src/kernel_builder.py +++ b/src/kernel_builder.py @@ -51,7 +51,8 @@ def _run_ssh(self, cmd: str, **kwargs) -> int: warn = kwargs.get("warn", False) return self.ssh_conn.run(f"cd {self.docker_mnt}/{self.kernel_root} && {cmd}", echo=True, warn=warn).exited - def _apply_patches(self): + def _apply_patches(self) -> int: + ret = 0 if self.patch_dir and Path(self.patch_dir).exists(): patch_files = [x for x in Path(self.patch_dir).iterdir()] if patch_files: @@ -59,6 +60,8 @@ def _apply_patches(self): logger.debug(f"Patching: {pfile}") if self._run_ssh(f"patch -p1 < ../../{self.patch_dir}/{pfile.name} > /dev/null", warn=True) != 0: logger.error(f"Failed to apply patch: {pfile}... Continuing anyway!") + ret = 1 + return ret def _build_mrproper(self) -> int: return self._run_ssh(f"{self.cc} ARCH={self.arch} make mrproper") @@ -68,7 +71,7 @@ def _build_arch(self) -> int: if self.arch == "x86_64": cmd += f"make {self.arch}_defconfig" else: - cmd += f" ARCH={self.arch} make defconfig" + cmd += f"ARCH={self.arch} make defconfig" return self._run_ssh(f"{cmd}") def _build_kvm_guest(self) -> int: diff --git a/src/tests/confs/cfg_setter.ini b/src/tests/confs/cfg_setter.ini new file mode 100644 index 0000000..dacd315 --- /dev/null +++ b/src/tests/confs/cfg_setter.ini @@ -0,0 +1,8 @@ +[debuggee] +foo = Bar +baz = False + +[kernel_dl] +mmp = 5.15.67 +tag = +commit = diff --git a/src/tests/confs/user.ini b/src/tests/confs/user.ini new file mode 100644 index 0000000..a3f4260 --- /dev/null +++ b/src/tests/confs/user.ini @@ -0,0 +1,9 @@ +[general] +# Architecture which is targeted +# Currently only x86_64 and arm64 is supported +arch = foobar +ignore_me = ignored + +[debugger] +# Force to rebuild the container +force_rebuild = no diff --git a/src/tests/test_kernel_builder.py b/src/tests/test_kernel_builder.py index ba253da..2577415 100644 --- a/src/tests/test_kernel_builder.py +++ b/src/tests/test_kernel_builder.py @@ -1,16 +1,20 @@ -from ..kernel_builder import KernelBuilder +from ..kernel_builder import KernelBuilder, MISC_DRVS_PATH from pathlib import Path import collections import configparser -from unittest.mock import patch +from unittest.mock import MagicMock, patch, Mock +import docker +import pytest +import uuid USER_INI = Path("configs/user.ini") +CUSTOM_MODULE = Path("examples/like_dbg_confs/echo_module.ini") -def fetch_cfg_value_from_section_and_key(sect: str, key: str) -> str: +def fetch_cfg_value_from_section_and_key(c: Path, sect: str, key: str) -> str: cfg = configparser.ConfigParser() - cfg.read(USER_INI) + cfg.read(c) return cfg[sect][key] @@ -45,21 +49,21 @@ def test_extra_args() -> None: assert are_lists_equal(expected, actual) is True -@patch("src.kernel_builder.KernelBuilder._run_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) def test_get_params_syzkaller(self) -> None: kb = KernelBuilder(**{"kroot": "foo"}) kb.mode = "syzkaller" - assert kb._get_params() == fetch_cfg_value_from_section_and_key("kernel_builder", "syzkaller_args") + assert kb._get_params() == fetch_cfg_value_from_section_and_key(USER_INI, "kernel_builder", "syzkaller_args") -@patch("src.kernel_builder.KernelBuilder._run_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) def test_get_params_generic(self) -> None: kb = KernelBuilder(**{"kroot": "foo"}) kb.mode = "generic" - assert kb._get_params() == fetch_cfg_value_from_section_and_key("kernel_builder", "generic_args") + assert kb._get_params() == fetch_cfg_value_from_section_and_key(USER_INI, "kernel_builder", "generic_args") -@patch("src.kernel_builder.KernelBuilder._run_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) def test_get_params_custom(self) -> None: kb = KernelBuilder(**{"kroot": "foo"}) kb.mode = "custom" @@ -68,7 +72,7 @@ def test_get_params_custom(self) -> None: assert kb._get_params() == "-e FOO -e BAR -d BAZ -d QUX" -@patch("src.kernel_builder.KernelBuilder._run_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) def test_get_params_extra(self) -> None: kb = KernelBuilder(**{"kroot": "foo"}) kb.mode = "" @@ -76,9 +80,194 @@ def test_get_params_extra(self) -> None: assert kb._get_params() == kb.extra_args -@patch("src.kernel_builder.KernelBuilder._run_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) def test_get_params_extra_override(self) -> None: kb = KernelBuilder(**{"kroot": "foo"}) kb.mode = "generic" kb.extra_args = "-d DEBUG_KERNEL" - assert kb._get_params() == fetch_cfg_value_from_section_and_key("kernel_builder", "generic_args").replace("-e DEBUG_KERNEL", kb.extra_args) + assert kb._get_params() == fetch_cfg_value_from_section_and_key(USER_INI, "kernel_builder", "generic_args").replace( + "-e DEBUG_KERNEL", kb.extra_args + ) + + +def test_add_modules() -> None: + p = Path(f"/tmp/{uuid.uuid1().hex}") + kb = KernelBuilder(**{"kroot": p}) + kb.custom_modules = fetch_cfg_value_from_section_and_key(CUSTOM_MODULE, "kernel_builder", "custom_modules") + Path(p / MISC_DRVS_PATH).mkdir(parents=True) + fst = "This is the 1st line.\n" + lst = "This is the last line.\n" + q = Path(p / MISC_DRVS_PATH / "Makefile") + q.touch() + q.write_text(fst) + r = Path(p / MISC_DRVS_PATH / "Kconfig") + r.touch() + r.write_text(f"{fst}\n{lst}") + kb._add_modules() + with open(q, "r") as f: + data = f.readlines() + assert data[-1] != fst + with open(r, "r") as f: + data = f.readlines() + assert data[-1] == lst + assert data[-2] != fst + + +@patch.object(KernelBuilder, "_run_ssh") +def test_build_arch_no_args(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb._build_arch() + mock_m.assert_called_with("CC=gcc make x86_64_defconfig") + + +@patch.object(KernelBuilder, "_run_ssh") +def test_build_arch_llvm(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.arch = "x86_64" + kb.cc = "CC=clang" + kb.llvm_flag = "LLVM=1" + kb._build_arch() + mock_m.assert_called_with("CC=clang LLVM=1 make x86_64_defconfig") + + +@patch.object(KernelBuilder, "_run_ssh") +def test_build_arch_arm(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.arch = "aarch64" + kb._build_arch() + mock_m.assert_called_with(f"CC=gcc ARCH={kb.arch} make defconfig") + + +@patch.object(KernelBuilder, "_run_ssh") +def test_build_kvm_guest(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb._build_kvm_guest() + mock_m.assert_called_with(f"CC=gcc ARCH={kb.arch} make kvm_guest.config") + + +@patch.object(KernelBuilder, "_run_ssh") +def test_configure_kernel(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.mode = "" + kb.extra_args = "-e FOO -d BAR" + kb._configure_kernel() + mock_m.assert_called_with(f"./scripts/config {kb.extra_args}") + + +@patch.object(KernelBuilder, "_run_ssh") +def test_build_mrproper(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb._build_mrproper() + mock_m.assert_called_with("CC=gcc ARCH=x86_64 make mrproper") + + +@patch.object(KernelBuilder, "_run_ssh") +def test_make_clean(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb._make_clean() + mock_m.assert_called_with("make clean") + + +@patch.object(KernelBuilder, "_run_ssh", return_value=0) +def test_make_sucess(mock_m) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb._make() + mock_m.assert_called_with("CC=gcc ARCH=x86_64 make -j$(nproc) modules") + + +@patch.object(KernelBuilder, "_run_ssh", return_value=1) +@patch.object(KernelBuilder, "stop_container", return_value=0) +def test_make_fail(mock_m, mock_k) -> None: + with pytest.raises(SystemExit) as ext: + kb = KernelBuilder(**{"kroot": "foo"}) + kb._make() + assert ext.type == SystemExit + assert ext.value.code == -1 + + +@patch.object(KernelBuilder, "_run_ssh", return_value=0) +def test_apply_patches(mock_m, tmp_path) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.patch_dir = tmp_path + Path(tmp_path / "patch_a").touch() + Path(tmp_path / "patch_b").touch() + assert kb._apply_patches() == 0 + + +@patch.object(KernelBuilder, "_run_ssh", return_value=1) +def test_apply_patches_fail(mock_m, tmp_path) -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.patch_dir = tmp_path + Path(tmp_path / "patch_a").touch() + assert kb._apply_patches() == 1 + + +def test_run_ssh_success() -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.ssh_conn = MagicMock() + cmd = "foobar --baz" + expected = f"cd {kb.docker_mnt}/{kb.kernel_root} && {cmd}" + kb._run_ssh(cmd) + assert kb.ssh_conn.run.assert_called_with(expected, echo=True, warn=False) is None + + +def get_run_kbuilder(mode: str) -> KernelBuilder: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.custom_modules = True + kb.ssh_fwd_port = 2222 + kb.client = docker.DockerClient() + kb.ssh_conn = Mock() + kb.mode = mode + kb.kvm = True + kb.image = "busybox" + kb.dirty = True + return kb + + +@patch.object(KernelBuilder, "_add_modules", return_value=0) +@patch.object(KernelBuilder, "_make_clean", return_value=0) +@patch.object(KernelBuilder, "_build_mrproper", return_value=0) +@patch.object(KernelBuilder, "_apply_patches", return_value=0) +@patch.object(KernelBuilder, "_build_arch", return_value=0) +@patch.object(KernelBuilder, "_build_kvm_guest", return_value=0) +@patch.object(KernelBuilder, "_configure_kernel", return_value=0) +@patch.object(KernelBuilder, "_make", return_value=0) +@patch.object(KernelBuilder, "_wait_for_container", return_value=0) +@patch.object(KernelBuilder, "init_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) +@patch("termios.tcflush", return_value=True) +@patch("builtins.input", lambda *args: "y") +def test_run_no_config_mode(a, b, c, d, e, f, g, h, j, k, lvname, m): + kb = get_run_kbuilder("noconfig") + kb.run() + expected = f"cd {kb.docker_mnt}/{kb.kernel_root}/arch/{kb.arch}/boot && ln -s bzImage Image" + assert kb.ssh_conn.run.assert_called_with(expected, echo=True) is None + + +@patch.object(KernelBuilder, "_add_modules", return_value=0) +@patch.object(KernelBuilder, "_make_clean", return_value=0) +@patch.object(KernelBuilder, "_build_mrproper", return_value=0) +@patch.object(KernelBuilder, "_apply_patches", return_value=0) +@patch.object(KernelBuilder, "_build_arch", return_value=0) +@patch.object(KernelBuilder, "_build_kvm_guest", return_value=0) +@patch.object(KernelBuilder, "_configure_kernel", return_value=0) +@patch.object(KernelBuilder, "_make", return_value=0) +@patch.object(KernelBuilder, "_wait_for_container", return_value=0) +@patch.object(KernelBuilder, "init_ssh", return_value=0) +@patch.object(KernelBuilder, "_run_ssh", return_value=0) +@patch("termios.tcflush", return_value=True) +@patch("builtins.input", lambda *args: "y") +def test_run_config_mode(a, b, c, d, e, f, g, h, j, k, lvname, m): + kb = get_run_kbuilder("config") + kb.run() + expected = f"cd {kb.docker_mnt}/{kb.kernel_root}/arch/{kb.arch}/boot && ln -s bzImage Image" + assert kb.ssh_conn.run.assert_called_with(expected, echo=True) is None + + +def test_wait_for_container() -> None: + kb = KernelBuilder(**{"kroot": "foo"}) + kb.container = Mock() + kb.container.id = 42 + kb.cli = Mock() + kb.cli.inspect_container.return_value = {"State": {"Health": {"Status": "healthy"}}} + kb._wait_for_container() diff --git a/src/tests/test_misc.py b/src/tests/test_misc.py index 56570e1..e511d25 100644 --- a/src/tests/test_misc.py +++ b/src/tests/test_misc.py @@ -1,7 +1,28 @@ -from src.misc import cross_compile, adjust_toolchain_arch, adjust_arch, adjust_qemu_arch, get_sha256_from_file +from src.misc import ( + cfg_setter, + cross_compile, + adjust_toolchain_arch, + adjust_arch, + adjust_qemu_arch, + get_sha256_from_file, + is_reuse, + new_context, + tmux, + tmux_shell, + _set_cfg, + _set_base_cfg, + _cherry_pick, +) import pytest import uuid from pathlib import Path +import configparser +import subprocess as sp +from unittest.mock import MagicMock, patch, Mock + +MMP_INI = Path("src/tests/confs/lkdl_mmp.ini") +CFG_INI = Path("src/tests/confs/cfg_setter.ini") +USR_INI = Path("src/tests/confs/user.ini") def test_cross_compile() -> None: @@ -37,3 +58,106 @@ def test_get_sha256_from_file() -> None: p.write_text("FOOBAR") assert get_sha256_from_file(p) == "24c422e681f1c1bd08286c7aaf5d23a5f088dcdb0b219806b3a9e579244f00c5" p.unlink() + + +@patch.object(sp, "run") +def test_tmux(mock_m) -> None: + cmd = "foo --bar --baz qux" + tmux(cmd) + mock_m.assert_called_with(f"tmux {cmd} > /dev/null", shell=True) + + +@patch("src.misc.tmux") +def test_tmux_shell(mock_m) -> None: + cmd = "foo --bar --baz qux" + tmux_shell(cmd) + mock_m.assert_called_with("send-keys 'foo --bar --baz qux' 'C-m'") + + +@patch("termios.tcflush", return_value=True) +@patch("builtins.input", lambda *args: "y") +def test_is_reuse(self) -> None: + assert is_reuse("foo") is True + + +@patch("termios.tcflush", return_value=True) +@patch("builtins.input", lambda *args: "n") +def test_is_not_reuse(self) -> None: + assert is_reuse("foo") is False + + +def test_set_cfg() -> None: + m = MagicMock + cfg = configparser.ConfigParser() + cfg.read(MMP_INI) + _set_cfg(cfg, m, "kernel_dl", "mmp", ignore_empty=False) + assert m.mmp == "5.15.67" + + +def test_set_cfg_not_ignore_empty() -> None: + m = MagicMock + m.tag = "foo" + cfg = configparser.ConfigParser() + cfg.read(MMP_INI) + _set_cfg(cfg, m, "kernel_dl", "tag", ignore_empty=False) + assert m.tag == "" + + +def test_set_cfg_ignore_empty() -> None: + m = MagicMock + m.tag = "foo" + cfg = configparser.ConfigParser() + cfg.read(MMP_INI) + _set_cfg(cfg, m, "kernel_dl", "tag", ignore_empty=True) + assert m.tag == "foo" + + +def test_cherry_pick() -> None: + m = MagicMock + cfg = configparser.ConfigParser() + cfg.read(CFG_INI) + _cherry_pick(cfg, {"debuggee": ["foo", "baz"], "debugger": ["qux"]}, m, ignore_empty=True) + assert m.foo == "Bar" + assert hasattr(m, "qux") is False + + +def test_set_base_cfg() -> None: + m = Mock() + cfg = configparser.ConfigParser() + cfg.read(CFG_INI) + _set_base_cfg(cfg, ["mmp", "tag"], m, ["debuggee", "kernel_dl", "foobar"], ignore_empty=False) + assert m.foo == "Bar" + assert m.baz == "False" + assert "tag" not in vars(m) + assert "mmp" not in vars(m) + assert "commit" in vars(m) + + +def test_set_base_cfg_ignore_empty() -> None: + m = Mock() + cfg = configparser.ConfigParser() + cfg.read(CFG_INI) + _set_base_cfg(cfg, [], m, ["debuggee", "kernel_dl"], ignore_empty=True) + assert "tag" not in vars(m) + assert "mmp" in vars(m) + assert "commit" not in vars(m) + + +@patch("src.misc.CFGS", [CFG_INI]) +def test_cfg_setter() -> None: + m = Mock() + cfg_setter(m, sections=["kernel_dl"], user_cfg=str(USR_INI), exclude_keys=["ignore_me"], cherry_pick={"debuggee": ["baz"]}) + assert "ignore_me" not in vars(m) + assert m.mmp == "5.15.67" + assert m.baz == "False" + + +def test_new_context(tmp_path) -> None: + initial_path = Path.cwd() + + @new_context(tmp_path) + def with_decorator(): + return Path.cwd() + + assert with_decorator() == tmp_path + assert Path.cwd() == initial_path