Skip to content

Commit

Permalink
Updated TC_RVCOPSTATE_2_1 test module:
Browse files Browse the repository at this point in the history
- Removed automatable PICS checks and replaced with attributes available to be gathered from endpoint.
  • Loading branch information
j-ororke committed Jul 10, 2024
1 parent 832661a commit 80aa98b
Showing 1 changed file with 53 additions and 76 deletions.
129 changes: 53 additions & 76 deletions src/python_testing/TC_RVCOPSTATE_2_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,7 @@
# limitations under the License.
#

# test-runner-runs: run1
# test-runner-run/run1/app: ${CHIP_RVC_APP}
# test-runner-run/run1/factoryreset: True
# test-runner-run/run1/quiet: True
# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --PICS examples/rvc-app/rvc-common/pics/rvc-app-pics-values --endpoint 1 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto

import logging
from time import sleep

import chip.clusters as Clusters
from chip.clusters.Types import NullValue
from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main
Expand Down Expand Up @@ -71,9 +62,6 @@ async def send_pause_cmd(self) -> Clusters.Objects.RvcOperationalState.Commands.
def write_to_app_pipe(self, command):
with open(self.app_pipe, "w") as app_pipe:
app_pipe.write(command + "\n")
# Allow some time for the command to take effect.
# This removes the test flakyness which is very annoying for everyone in CI.
sleep(0.001)

def TC_RVCOPSTATE_2_1(self) -> list[str]:
return ["RVCOPSTATE.S"]
Expand All @@ -89,15 +77,24 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.fail("The --app-pid flag must be set when PICS_SDK_CI_ONLY is set")
self.app_pipe = self.app_pipe + str(app_pid)

cluster = Clusters.RvcOperationalState
attributes = Clusters.RvcOperationalState.Attributes
RVCOpstate_attr_list = attributes.AttributeList
attribute_list = await self.read_single_attribute_check_success(endpoint=self.endpoint, cluster=cluster, attribute=RVCOpstate_attr_list)
phase_list_attr_id = attributes.PhaseList.attribute_id
current_phase_attr_id = attributes.CurrentPhase.attribute_id
countdown_time_attr_id = attributes.CountdownTime.attribute_id
oprtnlstate_list_attr_id = attributes.OperationalStateList.attribute_id
oprtnlstate_attr_id = attributes.OperationalState.attribute_id
oprtnlerror_attr_id = attributes.OperationalError.attribute_id

self.print_step(1, "Commissioning, already done")

# Ensure that the device is in the correct state
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')

if self.check_pics("RVCOPSTATE.S.A0000"):
if phase_list_attr_id in attribute_list:
self.print_step(2, "Read PhaseList attribute")
phase_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.PhaseList)

Expand All @@ -110,7 +107,7 @@ async def test_TC_RVCOPSTATE_2_1(self):

asserts.assert_less_equal(phase_list_len, 32, "PhaseList length(%d) must be less than 32!" % phase_list_len)

if self.check_pics("RVCOPSTATE.S.A0001"):
if current_phase_attr_id in attribute_list:
self.print_step(3, "Read CurrentPhase attribute")
current_phase = await self.read_mod_attribute_expect_success(endpoint=self.endpoint, attribute=attributes.CurrentPhase)
logging.info("CurrentPhase: %s" % (current_phase))
Expand All @@ -121,7 +118,7 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.assert_true(0 <= current_phase < phase_list_len,
"CurrentPhase(%s) must be between 0 and %d" % (current_phase, (phase_list_len - 1)))

if self.check_pics("RVCOPSTATE.S.A0002"):
if countdown_time_attr_id in attribute_list:
self.print_step(4, "Read CountdownTime attribute")
countdown_time = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.CountdownTime)
Expand All @@ -131,7 +128,7 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.assert_true(countdown_time >= 0 and countdown_time <= 259200,
"CountdownTime(%s) must be between 0 and 259200" % countdown_time)

if self.check_pics("RVCOPSTATE.S.A0003"):
if oprtnlstate_list_attr_id in attribute_list:
self.print_step(5, "Read OperationalStateList attribute")
operational_state_list = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalStateList)
Expand All @@ -154,7 +151,7 @@ async def test_TC_RVCOPSTATE_2_1(self):

asserts.assert_true(error_state_present, "The OperationalStateList does not have an ID entry of Error(0x03)")

if self.check_pics("RVCOPSTATE.S.A0004"):
if oprtnlstate_attr_id in attribute_list:
self.print_step(6, "Read OperationalState attribute")
operational_state = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalState)
Expand All @@ -165,63 +162,56 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.assert_true(operational_state in defined_states or in_range, "OperationalState has an invalid ID value!")

if self.check_pics("RVCOPSTATE.S.M.ST_STOPPED"):
test_step = "Manually put the device in the stopped state"
self.print_step("6a", test_step)
self.print_step("6a", "Manually put the device in the stopped state")
if not self.is_ci:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6b", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kStopped)
if self.check_pics("RVCOPSTATE.S.M.ST_RUNNING"):
test_step = "Manually put the device in the running state"
self.print_step("6c", test_step)
self.print_step("6c", "Manually put the device in the running state")
if self.is_ci:
await self.send_run_change_to_mode_cmd(1)
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6d", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kRunning)
if self.check_pics("RVCOPSTATE.S.M.ST_PAUSED"):
test_step = "Manually put the device in the paused state"
self.print_step("6e", test_step)
self.print_step("6e", "Manually put the device in the paused state")
if self.is_ci:
await self.send_pause_cmd()
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6f", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kPaused)
if self.check_pics("RVCOPSTATE.S.M.ST_ERROR"):
test_step = "Manually put the device in the error state"
self.print_step("6g", test_step)
self.print_step("6g", "Manually put the device in the error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6h", expected_state=Clusters.OperationalState.Enums.OperationalStateEnum.kError)
if self.check_pics("RVCOPSTATE.S.M.ST_SEEKING_CHARGER"):
test_step = "Manually put the device in the seeking charger state"
self.print_step("6i", test_step)
self.print_step("6i", "Manually put the device in the seeking charger state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "Reset"}')
await self.send_run_change_to_mode_cmd(1)
await self.send_run_change_to_mode_cmd(0)
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6j", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kSeekingCharger)
if self.check_pics("RVCOPSTATE.S.M.ST_CHARGING"):
test_step = "Manually put the device in the charging state"
self.print_step("6k", test_step)
self.print_step("6k", "Manually put the device in the charging state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ChargerFound"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6l", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kCharging)
if self.check_pics("RVCOPSTATE.S.M.ST_DOCKED"):
test_step = "Manually put the device in the docked state"
self.print_step("6m", test_step)
self.print_step("6m", "Manually put the device in the docked state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "Charged"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_opstate(step="6n", expected_state=Clusters.RvcOperationalState.Enums.OperationalStateEnum.kDocked)

if self.check_pics("RVCOPSTATE.S.A0005"):
if oprtnlerror_attr_id in attribute_list:
self.print_step(7, "Read OperationalError attribute")
operational_error = await self.read_mod_attribute_expect_success(endpoint=self.endpoint,
attribute=attributes.OperationalError)
Expand All @@ -240,100 +230,87 @@ async def test_TC_RVCOPSTATE_2_1(self):
asserts.assert_true(operational_error.errorStateLabel is not None, "ErrorStateLabel should be populated")

if self.check_pics("RVCOPSTATE.S.M.ERR_NO_ERROR"):
test_step = "Manually put the device in the no error state"
self.print_step("7a", test_step)
self.print_step("7a", "Manually put the device in the no error state")
if not self.is_ci:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7b", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kNoError)
if self.check_pics("RVCOPSTATE.S.M.ERR_UNABLE_TO_START_OR_RESUME"):
test_step = "Manually put the device in the unable to start or resume error state"
self.print_step("7c", test_step)
self.print_step("7c", "Manually put the device in the unable to start or resume error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToStartOrResume"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7d", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToStartOrResume)
if self.check_pics("RVCOPSTATE.S.M.ERR_UNABLE_TO_COMPLETE_OPERATION"):
test_step = "Manually put the device in the unable to complete operation error state"
self.print_step("7e", test_step)
self.print_step("7e", "Manually put the device in the unable to complete operation error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "UnableToCompleteOperation"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7f", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kUnableToCompleteOperation)
if self.check_pics("RVCOPSTATE.S.M.ERR_COMMAND_INVALID_IN_STATE"):
test_step = "Manually put the device in the command invalid error state"
self.print_step("7g", test_step)
self.print_step("7g", "Manually put the device in the command invalid error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "CommandInvalidInState"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7h", expected_error=Clusters.OperationalState.Enums.ErrorStateEnum.kCommandInvalidInState)
if self.check_pics("RVCOPSTATE.S.M.ERR_FAILED_TO_FIND_CHARGING_DOCK"):
test_step = "Manually put the device in the failed to find dock error state"
self.print_step("7i", test_step)
self.print_step("7i", "Manually put the device in the failed to find dock error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "FailedToFindChargingDock"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7j", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kFailedToFindChargingDock)
if self.check_pics("RVCOPSTATE.S.M.ERR_STUCK"):
test_step = "Manually put the device in the stuck error state"
self.print_step("7k", test_step)
self.print_step("7k", "Manually put the device in the stuck error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "Stuck"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7l", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kStuck)
if self.check_pics("RVCOPSTATE.S.M.ERR_DUST_BIN_MISSING"):
test_step = "Manually put the device in the dust bin missing error state"
self.print_step("7m", test_step)
self.print_step("7m", "Manually put the device in the dust bin missing error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "DustBinMissing"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7n", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinMissing)
if self.check_pics("RVCOPSTATE.S.M.ERR_DUST_BIN_FULL"):
test_step = "Manually put the device in the dust bin full error state"
self.print_step("7o", test_step)
self.print_step("7o", "Manually put the device in the dust bin full error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "DustBinFull"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7p", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kDustBinFull)
if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_EMPTY"):
test_step = "Manually put the device in the water tank empty error state"
self.print_step("7q", test_step)
self.print_step("7q", "Manually put the device in the water tank empty error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankEmpty"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7r", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankEmpty)
if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_MISSING"):
test_step = "Manually put the device in the water tank missing error state"
self.print_step("7s", test_step)
self.print_step("7s", "Manually put the device in the water tank missing error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankMissing"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7t", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankMissing)
if self.check_pics("RVCOPSTATE.S.M.ERR_WATER_TANK_LID_OPEN"):
test_step = "Manually put the device in the water tank lid open error state"
self.print_step("7u", test_step)
self.print_step("7u", "Manually put the device in the water tank lid open error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "WaterTankLidOpen"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7v", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kWaterTankLidOpen)
if self.check_pics("RVCOPSTATE.S.M.ERR_MOP_CLEANING_PAD_MISSING"):
test_step = "Manually put the device in the mop cleaning pad missing error state"
self.print_step("7w", test_step)
self.print_step("7w", "Manually put the device in the mop cleaning pad missing error state")
if self.is_ci:
self.write_to_app_pipe('{"Name": "ErrorEvent", "Error": "MopCleaningPadMissing"}')
else:
self.wait_for_user_input(prompt_msg=f"{test_step}, and press Enter when done.\n")
input("Press Enter when done.\n")
await self.read_and_validate_operror(step="7x", expected_error=Clusters.RvcOperationalState.Enums.ErrorStateEnum.kMopCleaningPadMissing)


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 80aa98b

Please sign in to comment.