diff --git a/src/python_testing/TC_RVCOPSTATE_2_1.py b/src/python_testing/TC_RVCOPSTATE_2_1.py index 01d4e0c88c4b13..18713001c27339 100644 --- a/src/python_testing/TC_RVCOPSTATE_2_1.py +++ b/src/python_testing/TC_RVCOPSTATE_2_1.py @@ -15,16 +15,7 @@ # limitations under the License. # -# test-runner-runs: run1 -# test-runner-run/run1/app: ${CHIP_RVC_APP} -# test-runner-run/run1/factoryreset: True -# test-runner-run/run1/quiet: True -# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json -# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS examples/rvc-app/rvc-common/pics/rvc-app-pics-values --endpoint 1 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto - import logging -from time import sleep - import chip.clusters as Clusters from chip.clusters.Types import NullValue from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main @@ -71,9 +62,6 @@ async def send_pause_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands. def write_to_app_pipe(self, command): with open(self.app_pipe, "w") as app_pipe: app_pipe.write(command + "\n") - # Allow some time for the command to take effect. - # This removes the test flakyness which is very annoying for everyone in CI. - sleep(0.001) def TC_RVCOPSTATE_2_1(self) -> list[str]: return ["RVCOPSTATE.S"] @@ -89,7 +77,16 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set") self.app_pipe = self.app_pipe + str(app_pid) + cluster = Clusters.RvcOperationalState attributes = Clusters.RvcOperationalState.Attributes + RVCOpstate_attr_list = attributes.AttributeList + attribute_list = await self.read_single_attribute_check_success(endpoint=self.endpoint, cluster=cluster, attribute=RVCOpstate_attr_list) + phase_list_attr_id = attributes.PhaseList.attribute_id + current_phase_attr_id = attributes.CurrentPhase.attribute_id + countdown_time_attr_id = attributes.CountdownTime.attribute_id + oprtnlstate_list_attr_id = attributes.OperationalStateList.attribute_id + oprtnlstate_attr_id = attributes.OperationalState.attribute_id + oprtnlerror_attr_id = attributes.OperationalError.attribute_id self.print_step(1, "Commissioning, already done") @@ -97,7 +94,7 @@ async def test_TC_RVCOPSTATE_2_1(self): if self.is_ci: self.write_to_app_pipe('{"Name": "Reset"}') - if self.check_pics("RVCOPSTATE.S.A0000"): + if phase_list_attr_id in attribute_list: self.print_step(2, "Read PhaseList attribute") phase_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.PhaseList) @@ -110,7 +107,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_less_equal(phase_list_len, 32, "PhaseList length(%d) must be less than 32!" % phase_list_len) - if self.check_pics("RVCOPSTATE.S.A0001"): + if current_phase_attr_id in attribute_list: self.print_step(3, "Read CurrentPhase attribute") current_phase = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentPhase) logging.info("CurrentPhase: %s" % (current_phase)) @@ -121,7 +118,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(0 <= current_phase < phase_list_len, "CurrentPhase(%s) must be between 0 and %d" % (current_phase, (phase_list_len - 1))) - if self.check_pics("RVCOPSTATE.S.A0002"): + if countdown_time_attr_id in attribute_list: self.print_step(4, "Read CountdownTime attribute") countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CountdownTime) @@ -131,7 +128,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(countdown_time >= 0 and countdown_time <= 259200, "CountdownTime(%s) must be between 0 and 259200" % countdown_time) - if self.check_pics("RVCOPSTATE.S.A0003"): + if oprtnlstate_list_attr_id in attribute_list: self.print_step(5, "Read OperationalStateList attribute") operational_state_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.OperationalStateList) @@ -154,7 +151,7 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(error_state_present, "The OperationalStateList does not have an ID entry of Error(0x03)") - if self.check_pics("RVCOPSTATE.S.A0004"): + if oprtnlstate_attr_id in attribute_list: self.print_step(6, "Read OperationalState attribute") operational_state = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.OperationalState) @@ -165,63 +162,56 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(operational_state in defined_states or in_range, "OperationalState has an invalid ID value!") if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"): - test_step = "Manually put the device in the stopped state" - self.print_step("6a", test_step) + self.print_step("6a", "Manually put the device in the stopped state") if not self.is_ci: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6b", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kStopped) if self.check_pics("RVCOPSTATE.S.M.ST_RUNNING"): - test_step = "Manually put the device in the running state" - self.print_step("6c", test_step) + self.print_step("6c", "Manually put the device in the running state") if self.is_ci: await self.send_run_change_to_mode_cmd(1) else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6d", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kRunning) if self.check_pics("RVCOPSTATE.S.M.ST_PAUSED"): - test_step = "Manually put the device in the paused state" - self.print_step("6e", test_step) + self.print_step("6e", "Manually put the device in the paused state") if self.is_ci: await self.send_pause_cmd() else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6f", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kPaused) if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"): - test_step = "Manually put the device in the error state" - self.print_step("6g", test_step) + self.print_step("6g", "Manually put the device in the error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6h", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kError) if self.check_pics("RVCOPSTATE.S.M.ST_SEEKING_CHARGER"): - test_step = "Manually put the device in the seeking charger state" - self.print_step("6i", test_step) + self.print_step("6i", "Manually put the device in the seeking charger state") if self.is_ci: self.write_to_app_pipe('{"Name": "Reset"}') await self.send_run_change_to_mode_cmd(1) await self.send_run_change_to_mode_cmd(0) else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6j", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kSeekingCharger) if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"): - test_step = "Manually put the device in the charging state" - self.print_step("6k", test_step) + self.print_step("6k", "Manually put the device in the charging state") if self.is_ci: self.write_to_app_pipe('{"Name": "ChargerFound"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6l", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kCharging) if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"): - test_step = "Manually put the device in the docked state" - self.print_step("6m", test_step) + self.print_step("6m", "Manually put the device in the docked state") if self.is_ci: self.write_to_app_pipe('{"Name": "Charged"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_opstate(step="6n", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked) - if self.check_pics("RVCOPSTATE.S.A0005"): + if oprtnlerror_attr_id in attribute_list: self.print_step(7, "Read OperationalError attribute") operational_error = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.OperationalError) @@ -240,100 +230,87 @@ async def test_TC_RVCOPSTATE_2_1(self): asserts.assert_true(operational_error.errorStateLabel is not None, "ErrorStateLabel should be populated") if self.check_pics("RVCOPSTATE.S.M.ERR_NO_ERROR"): - test_step = "Manually put the device in the no error state" - self.print_step("7a", test_step) + self.print_step("7a", "Manually put the device in the no error state") if not self.is_ci: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7b", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kNoError) if self.check_pics("RVCOPSTATE.S.M.ERR_UNABLE_TO_START_OR_RESUME"): - test_step = "Manually put the device in the unable to start or resume error state" - self.print_step("7c", test_step) + self.print_step("7c", "Manually put the device in the unable to start or resume error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7d", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToStartOrResume) if self.check_pics("RVCOPSTATE.S.M.ERR_UNABLE_TO_COMPLETE_OPERATION"): - test_step = "Manually put the device in the unable to complete operation error state" - self.print_step("7e", test_step) + self.print_step("7e", "Manually put the device in the unable to complete operation error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToCompleteOperation"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7f", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToCompleteOperation) if self.check_pics("RVCOPSTATE.S.M.ERR_COMMAND_INVALID_IN_STATE"): - test_step = "Manually put the device in the command invalid error state" - self.print_step("7g", test_step) + self.print_step("7g", "Manually put the device in the command invalid error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "CommandInvalidInState"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7h", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kCommandInvalidInState) if self.check_pics("RVCOPSTATE.S.M.ERR_FAILED_TO_FIND_CHARGING_DOCK"): - test_step = "Manually put the device in the failed to find dock error state" - self.print_step("7i", test_step) + self.print_step("7i", "Manually put the device in the failed to find dock error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "FailedToFindChargingDock"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7j", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kFailedToFindChargingDock) if self.check_pics("RVCOPSTATE.S.M.ERR_STUCK"): - test_step = "Manually put the device in the stuck error state" - self.print_step("7k", test_step) + self.print_step("7k", "Manually put the device in the stuck error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "Stuck"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7l", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kStuck) if self.check_pics("RVCOPSTATE.S.M.ERR_DUST_BIN_MISSING"): - test_step = "Manually put the device in the dust bin missing error state" - self.print_step("7m", test_step) + self.print_step("7m", "Manually put the device in the dust bin missing error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "DustBinMissing"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7n", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinMissing) if self.check_pics("RVCOPSTATE.S.M.ERR_DUST_BIN_FULL"): - test_step = "Manually put the device in the dust bin full error state" - self.print_step("7o", test_step) + self.print_step("7o", "Manually put the device in the dust bin full error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "DustBinFull"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7p", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinFull) if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_EMPTY"): - test_step = "Manually put the device in the water tank empty error state" - self.print_step("7q", test_step) + self.print_step("7q", "Manually put the device in the water tank empty error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankEmpty"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7r", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankEmpty) if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_MISSING"): - test_step = "Manually put the device in the water tank missing error state" - self.print_step("7s", test_step) + self.print_step("7s", "Manually put the device in the water tank missing error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankMissing"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7t", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankMissing) if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_LID_OPEN"): - test_step = "Manually put the device in the water tank lid open error state" - self.print_step("7u", test_step) + self.print_step("7u", "Manually put the device in the water tank lid open error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankLidOpen"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7v", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankLidOpen) if self.check_pics("RVCOPSTATE.S.M.ERR_MOP_CLEANING_PAD_MISSING"): - test_step = "Manually put the device in the mop cleaning pad missing error state" - self.print_step("7w", test_step) + self.print_step("7w", "Manually put the device in the mop cleaning pad missing error state") if self.is_ci: self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "MopCleaningPadMissing"}') else: - self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n") + input("Press Enter when done.\n") await self.read_and_validate_operror(step="7x", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing) - if __name__ == "__main__": default_matter_test_main()