From 3fc52683f0381c667bdc5ff434bb2730b8f08a28 Mon Sep 17 00:00:00 2001 From: Mathieu Payrol Date: Mon, 20 Jun 2022 18:33:19 +0000 Subject: [PATCH] Handle concurrent requests senquentially --- intex_spa/intex_spa.py | 89 ++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/intex_spa/intex_spa.py b/intex_spa/intex_spa.py index e12ac64..ff880bf 100644 --- a/intex_spa/intex_spa.py +++ b/intex_spa/intex_spa.py @@ -44,6 +44,7 @@ def __init__(self, address: str = "SPA_DEVICE", port: str = "8990"): self.status = IntexSpaStatus() self.last_successful_update_ms: int = None self.is_available: bool = None + self._semaphore = asyncio.Semaphore(1) async def _async_handle_intent( self, intent: str = "status", expected_state: typing.Union[bool, int] = None @@ -82,49 +83,51 @@ async def _async_handle_intent( ) await self.async_update_status() - if ( - # the provided intent is an update status - intent == "status" - # the provided intent is a command and its expected_state differs from the current state - or getattr(self.status, intent) != expected_state - ): - _LOGGER.debug("'%s' intent: a spa query is needed", intent) - - # Attempt maximum 5 times - for _ in range(5): - try: - _LOGGER.debug("'%s' intent: new spa query", intent) - # Initialize a query to the spa - query = IntexSpaQuery(intent, expected_state) - - # Send the raw request bytes via the network object - await self.network.async_send(query.request_bytes) - - # Receive the raw response bytes via the network object - received_bytes = await self.network.async_receive() - # Give the raw received_bytes back to the query object to render the new status - query.render_response_status(received_bytes) - _LOGGER.debug("'%s' intent: new status is rendered", intent) - # And update the status object with it - self.status = query.response_status - - # Set availability info - self.last_successful_update_ms = int(time.time() * 1000) - self.is_available = True - except ( - asyncio.TimeoutError, - asyncio.IncompleteReadError, - AssertionError, - ): - _LOGGER.warning("Exception raised during spa querying") - await asyncio.sleep(2) - continue - else: - break - else: # No retry has succeeded - # Set unavailability info - self.status = IntexSpaStatus() - self.is_available = False + # Run concurrent requests senquentially + async with self._semaphore: + if ( + # the provided intent is an update status + intent == "status" + # the provided intent is a command and its expected_state differs from the current state + or getattr(self.status, intent) != expected_state + ): + _LOGGER.debug("'%s' intent: a spa query is needed", intent) + + # Attempt maximum 5 times + for _ in range(5): + try: + _LOGGER.debug("'%s' intent: new spa query", intent) + # Initialize a query to the spa + query = IntexSpaQuery(intent, expected_state) + + # Send the raw request bytes via the network object + await self.network.async_send(query.request_bytes) + + # Receive the raw response bytes via the network object + received_bytes = await self.network.async_receive() + # Give the raw received_bytes back to the query object to render the new status + query.render_response_status(received_bytes) + _LOGGER.debug("'%s' intent: new status is rendered", intent) + # And update the status object with it + self.status = query.response_status + + # Set availability info + self.last_successful_update_ms = int(time.time() * 1000) + self.is_available = True + except ( + asyncio.TimeoutError, + asyncio.IncompleteReadError, + AssertionError, + ): + _LOGGER.warning("Exception raised during spa querying") + await asyncio.sleep(2) + continue + else: + break + else: # No retry has succeeded + # Set unavailability info + self.status = IntexSpaStatus() + self.is_available = False return self.status