Skip to content

Commit

Permalink
fix restyled issue
Browse files Browse the repository at this point in the history
  • Loading branch information
asirko-soft committed Feb 25, 2025
1 parent 33a3752 commit 683b602
Showing 1 changed file with 44 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
if TYPE_CHECKING:
from chip.testing.matter_testing import MatterBaseTest

EndpointCheckFunction = Callable[[Clusters.Attribute.AsyncReadTransaction.ReadResponse, int], bool]
EndpointCheckFunction = Callable[[
Clusters.Attribute.AsyncReadTransaction.ReadResponse, int], bool]


def _has_cluster(wildcard: Clusters.Attribute.AsyncReadTransaction.ReadResponse, endpoint: int, cluster: ClusterObjects.Cluster) -> bool:
Expand Down Expand Up @@ -95,21 +96,21 @@ def _has_attribute(wildcard: Clusters.Attribute.AsyncReadTransaction.ReadRespons
KeyError: If attribute's cluster_id is not found in ALL_CLUSTERS
"""
cluster = ClusterObjects.ALL_CLUSTERS[attribute.cluster_id]

if endpoint not in wildcard.attributes:
return False

if cluster not in wildcard.attributes[endpoint]:
return False

if cluster.Attributes.AttributeList not in wildcard.attributes[endpoint][cluster]:
return False

attr_list = wildcard.attributes[endpoint][cluster][cluster.Attributes.AttributeList]
if not isinstance(attr_list, list):
raise ValueError(
f"Failed to read mandatory AttributeList attribute value for cluster {cluster} on endpoint {endpoint}: {attr_list}.")

return attribute.attribute_id in attr_list


Expand Down Expand Up @@ -153,21 +154,21 @@ def _has_command(wildcard: Clusters.Attribute.AsyncReadTransaction.ReadResponse,
KeyError: If command's cluster_id is not found in ALL_CLUSTERS
"""
cluster = ClusterObjects.ALL_CLUSTERS[command.cluster_id]

if endpoint not in wildcard.attributes:
return False

if cluster not in wildcard.attributes[endpoint]:
return False

if cluster.Attributes.AcceptedCommandList not in wildcard.attributes[endpoint][cluster]:
return False

cmd_list = wildcard.attributes[endpoint][cluster][cluster.Attributes.AcceptedCommandList]
if not isinstance(cmd_list, list):
raise ValueError(
f"Failed to read mandatory AcceptedCommandList command value for cluster {cluster} on endpoint {endpoint}: {cmd_list}.")

return command.command_id in cmd_list


Expand Down Expand Up @@ -197,18 +198,18 @@ def has_command(command: ClusterObjects.ClusterCommand) -> EndpointCheckFunction
def _has_feature(wildcard: Clusters.Attribute.AsyncReadTransaction.ReadResponse, endpoint: int, cluster: ClusterObjects.ClusterObjectDescriptor, feature: IntFlag) -> bool:
if endpoint not in wildcard.attributes:
return False

if cluster not in wildcard.attributes[endpoint]:
return False

if cluster.Attributes.FeatureMap not in wildcard.attributes[endpoint][cluster]:
return False

feature_map = wildcard.attributes[endpoint][cluster][cluster.Attributes.FeatureMap]
if not isinstance(feature_map, int):
raise ValueError(
f"Failed to read mandatory FeatureMap attribute value for cluster {cluster} on endpoint {endpoint}: {feature_map}.")

return (feature & feature_map) != 0


Expand Down Expand Up @@ -236,7 +237,8 @@ def has_feature(cluster: ClusterObjects.ClusterObjectDescriptor, feature: IntFla


def _async_runner(body, test_instance, *args, **kwargs):
timeout = getattr(test_instance.matter_test_config, 'timeout', None) or test_instance.default_timeout
timeout = getattr(test_instance.matter_test_config,
'timeout', None) or test_instance.default_timeout
return test_instance.event_loop.run_until_complete(asyncio.wait_for(body(test_instance, *args, **kwargs), timeout=timeout))


Expand All @@ -256,7 +258,8 @@ def async_runner(self: "MatterBaseTest", *args, **kwargs):
async def _get_all_matching_endpoints(test_instance, accept_function: EndpointCheckFunction) -> list[int]:
""" Returns a list of endpoints matching the accept condition. """
wildcard = await test_instance.default_controller.Read(test_instance.dut_node_id, [(Clusters.Descriptor), Attribute.AttributePath(None, None, GlobalAttributeIds.ATTRIBUTE_LIST_ID), Attribute.AttributePath(None, None, GlobalAttributeIds.FEATURE_MAP_ID), Attribute.AttributePath(None, None, GlobalAttributeIds.ACCEPTED_COMMAND_LIST_ID)])
matching = [e for e in wildcard.attributes.keys() if accept_function(wildcard, e)]
matching = [e for e in wildcard.attributes.keys()
if accept_function(wildcard, e)]
return matching


Expand Down Expand Up @@ -286,19 +289,25 @@ def matching_runner(self: "MatterBaseTest", *args, **kwargs):
from chip.testing.matter_testing import MatterBaseTest
assert isinstance(self, MatterBaseTest)

runner_with_timeout = asyncio.wait_for(_get_all_matching_endpoints(self, accept_function), timeout=30)
runner_with_timeout = asyncio.wait_for(
_get_all_matching_endpoints(self, accept_function), timeout=30)
matching = self.event_loop.run_until_complete(runner_with_timeout)
asserts.assert_less_equal(len(matching), 1, "More than one matching endpoint found for singleton test.")
asserts.assert_less_equal(
len(matching), 1, "More than one matching endpoint found for singleton test.")
if not matching:
logging.info("Test is not applicable to any endpoint - skipping test")
logging.info(
"Test is not applicable to any endpoint - skipping test")
asserts.skip('No endpoint matches test requirements')
return
try:
old_endpoint = self.matter_test_config.endpoint
self.matter_test_config.endpoint = matching[0]
logging.info(f'Running test on endpoint {self.matter_test_config.endpoint}')
timeout = getattr(self.matter_test_config, 'timeout', None) or self.default_timeout
self.event_loop.run_until_complete(asyncio.wait_for(body(self, *args, **kwargs), timeout=timeout))
logging.info(
f'Running test on endpoint {self.matter_test_config.endpoint}')
timeout = getattr(self.matter_test_config,
'timeout', None) or self.default_timeout
self.event_loop.run_until_complete(asyncio.wait_for(
body(self, *args, **kwargs), timeout=timeout))
finally:
self.matter_test_config.endpoint = old_endpoint
return matching_runner
Expand Down Expand Up @@ -333,14 +342,20 @@ def run_if_endpoint_matches(accept_function: EndpointCheckFunction):
"""
def run_if_endpoint_matches_internal(body):
def per_endpoint_runner(test_instance, *args, **kwargs):
runner_with_timeout = asyncio.wait_for(should_run_test_on_endpoint(test_instance, accept_function), timeout=60)
should_run_test = test_instance.event_loop.run_until_complete(runner_with_timeout)
runner_with_timeout = asyncio.wait_for(
should_run_test_on_endpoint(test_instance, accept_function), timeout=60)
should_run_test = test_instance.event_loop.run_until_complete(
runner_with_timeout)
if not should_run_test:
logging.info("Test is not applicable to this endpoint - skipping test")
logging.info(
"Test is not applicable to this endpoint - skipping test")
asserts.skip('Endpoint does not match test requirements')
return
logging.info(f'Running test on endpoint {test_instance.matter_test_config.endpoint}')
timeout = getattr(test_instance.matter_test_config, 'timeout', None) or test_instance.default_timeout
test_instance.event_loop.run_until_complete(asyncio.wait_for(body(test_instance, *args, **kwargs), timeout=timeout))
logging.info(
f'Running test on endpoint {test_instance.matter_test_config.endpoint}')
timeout = getattr(test_instance.matter_test_config,
'timeout', None) or test_instance.default_timeout
test_instance.event_loop.run_until_complete(asyncio.wait_for(
body(test_instance, *args, **kwargs), timeout=timeout))
return per_endpoint_runner
return run_if_endpoint_matches_internal

0 comments on commit 683b602

Please sign in to comment.