diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_load_firmware.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_load_firmware.py new file mode 100755 index 0000000000..af4f845049 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/bin/rpmsg_load_firmware.py @@ -0,0 +1,291 @@ +#!/usr/bin/env python3 +# This file is part of Checkbox. +# +# Copyright 2024 Canonical Ltd. +# Written by: +# Stanley Huang +# +# Checkbox is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3, +# as published by the Free Software Foundation. +# +# Checkbox is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Checkbox. If not, see . + +import sys +import re +import argparse +import time +import logging +import select +from collections import OrderedDict +from systemd import journal +from pathlib import Path + + +class RpmsgLoadFirmwareTest: + + properties = ["firmware_path", "firmware_file", "rpmsg_state"] + + def __init__(self, remoteproc_dev: str): + self._firmware_path = Path( + "/sys/module/firmware_class/parameters/path" + ) + self._firmware_file = Path( + "/sys/class/remoteproc/{}/firmware".format(remoteproc_dev) + ) + self._rpmsg_state = Path( + "/sys/class/remoteproc/{}/state".format(remoteproc_dev) + ) + self._search_patterns = {} + self.expected_events = [] + + def __enter__(self): + self._setup() + return self + + def __exit__(self, type, value, traceback): + self._teardown() + + def _setup(self): + self._previous_config = OrderedDict() + for key in self.properties: + self._previous_config.update({key: getattr(self, key)}) + + def _teardown(self): + self.rpmsg_state = "stop" + for key in self.properties: + if getattr(self, key) != self._previous_config[key]: + setattr(self, key, self._previous_config[key]) + + @property + def firmware_path(self) -> str: + return self._firmware_path.read_text() + + @firmware_path.setter + def firmware_path(self, value: str) -> None: + self._firmware_path.write_text(value) + + @property + def firmware_file(self) -> str: + return self._firmware_file.read_text() + + @firmware_file.setter + def firmware_file(self, value: str) -> None: + self._firmware_file.write_text(value) + + @property + def rpmsg_state(self) -> str: + return self._rpmsg_state.read_text() + + @rpmsg_state.setter + def rpmsg_state(self, value: str) -> None: + self._rpmsg_state.write_text(value) + + @property + def search_pattern(self) -> dict: + return self._search_patterns + + @search_pattern.setter + def search_pattern(self, patterns: dict) -> None: + self._search_patterns.update(patterns) + + def _init_logger(self) -> None: + self.log_reader = journal.Reader() + self.log_reader.this_boot() + self.log_reader.seek_tail() + self.log_reader.get_previous() + + self._poller = select.poll() + self._poller.register(self.log_reader, self.log_reader.get_events()) + + def lookup_reload_logs(self, entry: dict) -> bool: + keep_looking = True + for key, pattern in self._search_patterns.items(): + if re.search(pattern, entry.get("MESSAGE")): + self.expected_events.append((key, entry.get("MESSAGE"))) + if key == "ready": + keep_looking = False + break + + return keep_looking + + def _monitor_journal_logs(self, lookup_func) -> list: + start_time = time.time() + logging.info("# start time: %s", start_time) + + while self._poller.poll(1000): + if self.log_reader.process() != journal.APPEND: + continue + for entry in self.log_reader: + logging.debug(entry["MESSAGE"]) + if entry["MESSAGE"] == "": + continue + if lookup_func(entry) is False: + return self.expected_events + + cur_time = time.time() + if (cur_time - start_time) > 60: + return self.expected_events + + +def verify_load_firmware_logs( + match_records: list, search_stages: list +) -> bool: + logging.info("Validate RPMSG related log from journal logs") + logging.debug(match_records) + actuall_stage = [] + for record in match_records: + if record[1]: + actuall_stage.append(record[0]) + logging.info("%s stage: %s", record[0], record[1]) + + return set(actuall_stage) == set(search_stages) + + +def load_firmware_test(args) -> None: + remote_proc_dev = args.device + target_path = args.path + target_file = args.file + + proc_pattern = "remoteproc remoteproc[0-9]+" + search_patterns = { + "start": r"{}: powering up imx-rproc".format(proc_pattern), + "boot_image": (r"{}: Booting fw image (?P\w*.elf), \w*").format( + proc_pattern + ), + # Please keep latest record in ready stage + # This function will return if latest record been captured. + "ready": (r"{}: remote processor imx-rproc is now up").format( + proc_pattern + ), + } + logging.info("# Start load M4 firmware test") + with RpmsgLoadFirmwareTest(remote_proc_dev) as rpmsg_handler: + rpmsg_handler.search_pattern = search_patterns + rpmsg_handler._init_logger() + if rpmsg_handler.rpmsg_state == "online": + logging.info("Stop the Remote processor") + rpmsg_handler.rpmsg_state = "stop" + logging.info( + "Configure the firmware file to %s and firmware path to %s", + target_file, + target_path, + ) + rpmsg_handler.firmware_path = target_path + rpmsg_handler.firmware_file = target_file + logging.info("Start the Remote processor") + rpmsg_handler.rpmsg_state = "start" + rpmsg_handler._monitor_journal_logs(rpmsg_handler.lookup_reload_logs) + + if verify_load_firmware_logs( + rpmsg_handler.expected_events, + rpmsg_handler._search_patterns.keys(), + ): + logging.info("# Reload M4 firmware successful") + else: + raise SystemExit("# Reload M4 firmware failed") + + # AI: will adding a feature to do testing after load M-core firmware + # extra_testing = None + # if extra_testing is not None: + # module_c = __import__(os.path.splitext(__file__)[0]) + # getattr(module_c, extra_testing)() + + +def dump_firmware_test_mapping(args) -> None: + firmware_mapping = args.mapping + firmware_path = args.path + pattern = r"(\w*):([\w\.-]*)" + output_format = "device: {}\nfirmware: {}\npath: {}\n" + + re_result = re.findall(pattern, firmware_mapping) + if not re_result or firmware_path.strip() == "": + print( + output_format.format( + firmware_mapping, firmware_mapping, firmware_path + ) + ) + return + + for data in re_result: + print(output_format.format(data[0], data[1], firmware_path)) + + +def register_arguments(): + parser = argparse.ArgumentParser(description="RPMSG reload firmware test") + + subparsers = parser.add_subparsers(dest="mode", required=True) + reload_test_parser = subparsers.add_parser("test-reload") + reload_test_parser.add_argument( + "--device", + help="The RPMSG device", + type=str, + required=True, + ) + reload_test_parser.add_argument( + "--path", + help="The directory to store M-core ELF firmware", + type=str, + required=True, + ) + reload_test_parser.add_argument( + "--file", help="M-core ELF firmware file", required=True, type=str + ) + # AI: will adding a feature to do testing after load M-core firmware + # reload_test_parser.add_argument( + # "--extra-test", + # help="RPMSG functional tests", + # choices=["pingpong", "rpmsg-tty"], + # default=None, + # ) + reload_test_parser.set_defaults(test_func=load_firmware_test) + + reload_res_test_parser = subparsers.add_parser("resource-reload") + reload_res_test_parser.add_argument( + "--mapping", + help="The mapping with RPMSG node and M-Core firmware", + type=str, + required=True, + ) + reload_res_test_parser.add_argument( + "--path", + help="The directory to store M-core ELF firmware", + type=str, + required=True, + ) + reload_res_test_parser.set_defaults(test_func=dump_firmware_test_mapping) + + return parser.parse_args() + + +if __name__ == "__main__": + + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + logger_format = "%(asctime)s %(levelname)-8s %(message)s" + date_format = "%Y-%m-%d %H:%M:%S" + + # Log DEBUG and INFO to stdout, others to stderr + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(logging.Formatter(logger_format, date_format)) + + stderr_handler = logging.StreamHandler(sys.stderr) + stderr_handler.setFormatter(logging.Formatter(logger_format, date_format)) + + stdout_handler.setLevel(logging.DEBUG) + stderr_handler.setLevel(logging.WARNING) + + # Add a filter to the stdout handler to limit log records to + # INFO level and below + stdout_handler.addFilter(lambda record: record.levelno <= logging.INFO) + + root_logger.addHandler(stderr_handler) + root_logger.addHandler(stdout_handler) + args = register_arguments() + args.test_func(args) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_load_firmware.py b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_load_firmware.py new file mode 100755 index 0000000000..1d9527f755 --- /dev/null +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/tests/test_rpmsg_load_firmware.py @@ -0,0 +1,242 @@ +import unittest +import sys +import argparse +from pathlib import Path +from io import StringIO +from contextlib import redirect_stdout +from unittest.mock import patch, MagicMock, Mock + +sys.modules["systemd"] = MagicMock() + +import rpmsg_load_firmware + + +class RpmsgLoardFirmwareTests(unittest.TestCase): + """ + Unit tests for RPMSG load firmware test scripts + """ + + def setUp(self) -> None: + test_dev = "remoteproc0" + self._rpmsg_load_fw_test = rpmsg_load_firmware.RpmsgLoadFirmwareTest( + test_dev + ) + self._default_search_pattern = { + "start": r"remoteproc remoteproc[0-9]+: powering up imx-rproc", + "boot_image": ( + r"remoteproc remoteproc[0-9]+: " + r"Booting fw image (?P\w*.elf), \w*" + ), + "ready": ( + r"remoteproc remoteproc[0-9]+: " + r"remote processor imx-rproc is now up" + ), + } + + def test_validate_rpmsg_object(self): + + self.assertEqual( + self._rpmsg_load_fw_test._firmware_path, + Path("/sys/module/firmware_class/parameters/path"), + ) + self.assertEqual( + self._rpmsg_load_fw_test._firmware_file, + Path("/sys/class/remoteproc/remoteproc0/firmware"), + ) + self.assertEqual( + self._rpmsg_load_fw_test._rpmsg_state, + Path("/sys/class/remoteproc/remoteproc0/state"), + ) + self.assertDictEqual( + self._rpmsg_load_fw_test._search_patterns, + {}, + ) + self.assertListEqual(self._rpmsg_load_fw_test.expected_events, []) + + @patch("pathlib.Path.read_text") + def test_get_firmware_path(self, mock_read): + + expected_result = "test-response" + mock_read.return_value = expected_result + self.assertEqual( + self._rpmsg_load_fw_test.firmware_path, expected_result + ) + + @patch("pathlib.Path.write_text") + def test_set_firmware_path(self, mock_write): + expected_result = "test-response" + self._rpmsg_load_fw_test.firmware_path = expected_result + mock_write.assert_called_once_with(expected_result) + + @patch("pathlib.Path.read_text") + def test_get_firmware_file(self, mock_read): + + expected_result = "test-response" + mock_read.return_value = expected_result + self.assertEqual( + self._rpmsg_load_fw_test.firmware_file, expected_result + ) + + @patch("pathlib.Path.write_text") + def test_set_firmware_file(self, mock_write): + expected_result = "test-response" + self._rpmsg_load_fw_test.firmware_file = expected_result + mock_write.assert_called_once_with(expected_result) + + @patch("pathlib.Path.read_text") + def test_get_rpmsg_state(self, mock_read): + + expected_result = "test-response" + mock_read.return_value = expected_result + self.assertEqual(self._rpmsg_load_fw_test.rpmsg_state, expected_result) + + @patch("pathlib.Path.write_text") + def test_set_rpmsg_state(self, mock_write): + expected_result = "test-response" + self._rpmsg_load_fw_test.rpmsg_state = expected_result + mock_write.assert_called_once_with(expected_result) + + def test_get_search_pattern(self): + self.assertDictEqual( + self._rpmsg_load_fw_test.search_pattern, + {}, + ) + + def test_set_search_pattern(self): + expected_result = { + "start": "fake", + "boot_image": "fake", + "ready": "fake", + } + + self._rpmsg_load_fw_test.search_pattern = expected_result + self.assertDictEqual( + self._rpmsg_load_fw_test.search_pattern, + expected_result, + ) + + def test_init_logger(self): + pass + + def test_lookup_reload_logs_not_last_one(self): + self._rpmsg_load_fw_test.search_pattern = self._default_search_pattern + entry = { + "MESSAGE": ( + "Apr 25 07:12:53 ubuntu kernel: remoteproc " + "remoteproc0: powering up imx-rproc" + ) + } + self.assertTrue(self._rpmsg_load_fw_test.lookup_reload_logs(entry)) + self.assertEqual( + self._rpmsg_load_fw_test.expected_events, + [("start", entry["MESSAGE"])], + ) + + def test_lookup_reload_logs_last_one(self): + self._rpmsg_load_fw_test.search_pattern = self._default_search_pattern + entry = { + "MESSAGE": ( + "Apr 25 07:12:53 ubuntu kernel: remoteproc " + "remoteproc0: remote processor imx-rproc is now up" + ) + } + self.assertFalse(self._rpmsg_load_fw_test.lookup_reload_logs(entry)) + self.assertEqual( + self._rpmsg_load_fw_test.expected_events, + [("ready", entry["MESSAGE"])], + ) + + def test_verify_load_firmware_logs_successful(self): + match_records = [("stage1", "message1"), ("stage2", "message2")] + search_stages = ["stage1", "stage2"] + + self.assertTrue( + rpmsg_load_firmware.verify_load_firmware_logs( + match_records, search_stages + ) + ) + + def test_verify_load_firmware_logs_not_match(self): + match_records = [("stage1", "message1")] + search_stages = ["stage1", "stage2"] + + self.assertFalse( + rpmsg_load_firmware.verify_load_firmware_logs( + match_records, search_stages + ) + ) + + +class RpmsgMainFunctionTest(unittest.TestCase): + def test_reload_test_parser(self): + sys.argv = [ + "rpmsg_load_firmware.py", + "test-reload", + "--device", + "remoteproc0", + "--path", + "/home", + "--file", + "test-fw.elf", + ] + args = rpmsg_load_firmware.register_arguments() + + self.assertEqual( + args.test_func, rpmsg_load_firmware.load_firmware_test + ) + self.assertEqual(args.device, "remoteproc0") + self.assertEqual(args.path, "/home") + self.assertEqual(args.file, "test-fw.elf") + + def test_resource_parser(self): + sys.argv = [ + "rpmsg_load_firmware.py", + "resource-reload", + "--mapping", + "remoteproc0:test-fw.elf", + "--path", + "/home", + ] + args = rpmsg_load_firmware.register_arguments() + + self.assertEqual( + args.test_func, rpmsg_load_firmware.dump_firmware_test_mapping + ) + self.assertEqual(args.path, "/home") + self.assertEqual(args.mapping, "remoteproc0:test-fw.elf") + + def test_dump_firmware_test_mapping_successful(self): + mock_args = Mock( + return_value=argparse.Namespace( + mapping="remoteproc0:test-fw.elf remoteproc1:test-fw.elf", + path="/home/ubuntu", + ) + ) + with redirect_stdout(StringIO()) as stdout: + rpmsg_load_firmware.dump_firmware_test_mapping(mock_args()) + self.assertEqual( + stdout.getvalue().strip("\n"), + ( + "device: remoteproc0\nfirmware: test-fw.elf\n" + "path: /home/ubuntu\n\n" + "device: remoteproc1\nfirmware: test-fw.elf\n" + "path: /home/ubuntu" + ), + ) + + def test_dump_firmware_test_mapping_format_invalid(self): + mock_args = Mock( + return_value=argparse.Namespace( + mapping="remoteproest-fw.elf", + path="/home/ubuntu", + ) + ) + args = mock_args() + with redirect_stdout(StringIO()) as stdout: + rpmsg_load_firmware.dump_firmware_test_mapping(args) + self.assertEqual( + stdout.getvalue().strip("\n"), + ("device: {}\nfirmware: {}\npath: {}").format( + args.mapping, args.mapping, args.path + ), + ) diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/category.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/category.pxu index 0474a6350c..8677e2814c 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/category.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/category.pxu @@ -1,3 +1,3 @@ unit: category id: rpmsg -_name: M-series core RPMSG test \ No newline at end of file +_name: Remote Processor Messaging (rpmsg) Framework test diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/jobs.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/jobs.pxu index 4154e50ac0..e0f71d32a5 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/jobs.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/jobs.pxu @@ -32,3 +32,35 @@ user: root plugin: shell command: rpmsg_tests.py --type serial-tty + +unit: job +category_id: rpmsg +id: ce-oem-rpmsg/rp-firmware-mapping +plugin: resource +_summary: List Remote Processor firmwares and RPMSG node mapping +_description: + List firmware and RPMSG node mapping for reload Remote Processor firmware test + RPMSG_RP_FIRMWARE_MAPPING="remoteproc0:test-1.elf remoteproc0:test-2.elf" + RPMSG_RP_FIRMWARE_PATH="/home/user1" +estimated_duration: 2s +flags: preserve-locale +environ: RPMSG_RP_FIRMWARE_MAPPING RPMSG_RP_FIRMWARE_PATH +command: + rpmsg_load_firmware.py resource-reload --path "$RPMSG_RP_FIRMWARE_PATH" --mapping "$RPMSG_RP_FIRMWARE_MAPPING" + +unit: template +template-resource: ce-oem-rpmsg/rp-firmware-mapping +template-id: ce-oem-rpmsg/reload-rp-firmware-test +template-unit: job +_template-summary: Reload Remote Processor firmware via RPMSG +_summary: Reload Remote Processor firmware to {firmware} via RPMSG {device} +id: ce-oem-rpmsg/reload-rp-firmware-test-{firmware}-{device} +category_id: rpmsg +estimated_duration: 60 +requires: manifest.has_rpmsg == 'True' +imports: from com.canonical.plainbox import manifest +flags: also-after-suspend +user: root +plugin: shell +command: + rpmsg_load_firmware.py test-reload --device {device} --file {firmware} --path {path} diff --git a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu index f3c4782d9e..2d1b754d4c 100644 --- a/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu +++ b/contrib/checkbox-ce-oem/checkbox-provider-ce-oem/units/rpmsg/test-plan.pxu @@ -1,6 +1,6 @@ id: ce-oem-rpmsg unit: test plan -_name: M-series core RPMSG test +_name: Remote Processor RPMSG test _description: Check RPMSG framework between A core and M core. include: nested_part: @@ -9,15 +9,18 @@ nested_part: id: ce-oem-rpmsg-manual unit: test plan -_name: M-series core RPMSG manual tests +_name: Remote Processor core RPMSG manual tests _description: Manual RPMSG framework tests include: id: ce-oem-rpmsg-automated unit: test plan -_name: M-series core RPMSG auto tests +_name: Remote Processor RPMSG auto tests _description: Automated RPMSG framework tests +bootstrap_include: + ce-oem-rpmsg/rp-firmware-mapping include: ce-oem-rpmsg/detect-device ce-oem-rpmsg/serial-tty ce-oem-rpmsg/pingpong + ce-oem-rpmsg/reload-rp-firmware-test