From 0ca7b8541f134a28a3c464329f0991da47f0c6da Mon Sep 17 00:00:00 2001 From: tkevinbest <70407790+tkevinbest@users.noreply.github.com> Date: Wed, 25 Sep 2024 21:10:23 +0000 Subject: [PATCH 1/3] Initial commit of load cell validity check. --- opensourceleg/sensors/loadcell.py | 25 +++++++++++++++++++++++++ opensourceleg/utilities/utilities.py | 21 +++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/opensourceleg/sensors/loadcell.py b/opensourceleg/sensors/loadcell.py index 348f9e4f..33b17aa1 100644 --- a/opensourceleg/sensors/loadcell.py +++ b/opensourceleg/sensors/loadcell.py @@ -10,6 +10,7 @@ from opensourceleg.logging import LOGGER from opensourceleg.sensors.base import LoadcellBase +from opensourceleg.utilities.utilities import Counter class LoadcellNotRespondingException(Exception): @@ -43,6 +44,7 @@ def __init__( calibration_matrix=None, bus: int = 1, i2c_address: int = 0x66, + enable_diagnostics = True, ) -> None: self._amp_gain: float = amp_gain self._exc: float = exc @@ -62,6 +64,10 @@ def __init__( self._zero_calibration_offset: npt.NDArray[np.double] = self._calibration_offset self._is_calibrated: bool = False self._is_streaming: bool = False + self._enable_diagnostics: bool = enable_diagnostics + self._data_potentially_invalid: bool = False + if self._enable_diagnostics: + self._diagnostics_counter = Counter() def start(self) -> None: if self._bus or self._i2c_address is None: @@ -96,6 +102,25 @@ def update( np.transpose(a=self._calibration_matrix.dot(b=np.transpose(a=coupled_data))) - calibration_offset ) + if self._enable_diagnostics: + self.check_data() + + def check_data( + self, + ) -> None: + """ + Watches raw values from the load cell to try to catch broken wires. + """ + ADC_saturated_high = any(self._data == self.ADC_RANGE) + ADC_saturated_low = any(self._data == 0) + concerning_data_found = ADC_saturated_high or ADC_saturated_low + self._diagnostics_counter.update(concerning_data_found) + if self._diagnostics_counter.current_count >= 5: + self._data_potentially_invalid = True + + @property + def data_potentially_invalid(self): + return self._data_potentially_invalid def calibrate( self, diff --git a/opensourceleg/utilities/utilities.py b/opensourceleg/utilities/utilities.py index d0d01a6d..61418198 100644 --- a/opensourceleg/utilities/utilities.py +++ b/opensourceleg/utilities/utilities.py @@ -37,3 +37,24 @@ def get_ctype(token): raise Exception("Unknown type: " + token) return out + +class Counter: + """ + A simple counter class that increments a counter each time the increment_counter argument is set true. + To reset the counter, call update with increment_counter set to false. + + Author: Kevin Best, 9/25/2024 + https://github.com/tkevinbest + """ + def __init__(self): + self._count: int = 0 + + def update(self, increment_counter:bool): + if increment_counter: + self._count += 1 + else: + self._count = 0 + + @property + def current_count(self): + return self._count From 21fbd7df45143ea9cf2e5941113117ca8cdcca5d Mon Sep 17 00:00:00 2001 From: tkevinbest <70407790+tkevinbest@users.noreply.github.com> Date: Tue, 8 Apr 2025 21:48:59 +0000 Subject: [PATCH 2/3] feat: add exception for broken load cell wire detected --- opensourceleg/sensors/loadcell.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/opensourceleg/sensors/loadcell.py b/opensourceleg/sensors/loadcell.py index 77180016..17092af2 100644 --- a/opensourceleg/sensors/loadcell.py +++ b/opensourceleg/sensors/loadcell.py @@ -48,6 +48,25 @@ def __init__(self, message: str = "Load cell unresponsive.") -> None: super().__init__(message) +class LoadcellBrokenWireDetectedException(Exception): + """ + Exception raised when a broken wire is detected in the load cell. + Indicated by saturated ADC values (0 or 4095). + + Attributes: + message (str): Description of the error. + """ + + def __init__(self, message: str = "Load cell broken wire detected.") -> None: + """ + Initialize the LoadcellBrokenWireDetectedException. + + Args: + message (str, optional): Error message. Defaults to "Load cell broken wire detected.". + """ + super().__init__(message) + + class DEPHY_AMPLIFIER_MEMORY_CHANNELS(int, Enum): """ Enumeration of memory channel addresses used by the load cell. @@ -203,20 +222,16 @@ def check_data( ) -> None: """ Watches raw values from the load cell to try to catch broken wires. + Symptom is indicated by saturation at either max or min ADC values. """ ADC_saturated_high = any(self._data == self.ADC_RANGE) ADC_saturated_low = any(self._data == 0) concerning_data_found = ADC_saturated_high or ADC_saturated_low self._diagnostics_counter.update(concerning_data_found) if self._diagnostics_counter.current_count >= 5: - self._data_potentially_invalid = True - - @property - def data_potentially_invalid(self) -> bool: - """ - Check if the data from the load cell is potentially invalid, - indicated by saturation at either max or min ADC values.""" - return self._data_potentially_invalid + raise LoadcellBrokenWireDetectedException( + f"[{self.__repr__()}] Consistent saturation in readings, check wiring. " f"ADC values: {self._data}. " + ) def calibrate( self, From 3b53a2594731fa2a4cd82b027d1c45700650b864 Mon Sep 17 00:00:00 2001 From: tkevinbest <70407790+tkevinbest@users.noreply.github.com> Date: Wed, 9 Apr 2025 13:21:20 +0000 Subject: [PATCH 3/3] misc: add unit tests for loadcell broken wire check. fix: type annotations in loadcell --- opensourceleg/sensors/loadcell.py | 28 +++++++++++----------- tests/test_sensors/test_loadcell.py | 36 +++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/opensourceleg/sensors/loadcell.py b/opensourceleg/sensors/loadcell.py index 17092af2..174e6e27 100644 --- a/opensourceleg/sensors/loadcell.py +++ b/opensourceleg/sensors/loadcell.py @@ -19,7 +19,7 @@ import time from enum import Enum -from typing import Any, Callable, Optional +from typing import Callable, Optional import numpy as np import numpy.typing as npt @@ -165,6 +165,7 @@ def __init__( self._data_potentially_invalid: bool = False if self._enable_diagnostics: self._diagnostics_counter = Counter() + self._num_broken_wire_pre_exception: int = 5 def start(self) -> None: """ @@ -189,7 +190,7 @@ def reset(self) -> None: def update( self, calibration_offset: Optional[npt.NDArray[np.double]] = None, - data_callback: Optional[Callable[..., npt.NDArray[np.uint8]]] = None, + data_callback: Optional[Callable[..., npt.NDArray[np.uint16]]] = None, ) -> None: """ Query the load cell for the latest data and update internal state. @@ -206,6 +207,9 @@ def update( """ data = data_callback() if data_callback else self._read_compressed_strain() + if self._enable_diagnostics: + self.check_data(data) + if calibration_offset is None: calibration_offset = self._calibration_offset @@ -214,30 +218,26 @@ def update( # Process the data using the calibration matrix and subtract the offset. self._data = np.transpose(a=self._calibration_matrix.dot(b=np.transpose(a=coupled_data))) - calibration_offset - if self._enable_diagnostics: - self.check_data() - def check_data( - self, - ) -> None: + def check_data(self, data: npt.NDArray[np.uint16]) -> None: """ Watches raw values from the load cell to try to catch broken wires. Symptom is indicated by saturation at either max or min ADC values. """ - ADC_saturated_high = any(self._data == self.ADC_RANGE) - ADC_saturated_low = any(self._data == 0) - concerning_data_found = ADC_saturated_high or ADC_saturated_low + ADC_saturated_high = np.any(data == self.ADC_RANGE) # Use np.any for NumPy arrays + ADC_saturated_low = np.any(data == 0) # Use np.any for NumPy arrays + concerning_data_found = bool(ADC_saturated_high or ADC_saturated_low) self._diagnostics_counter.update(concerning_data_found) - if self._diagnostics_counter.current_count >= 5: + if self._diagnostics_counter.current_count >= self._num_broken_wire_pre_exception: raise LoadcellBrokenWireDetectedException( - f"[{self.__repr__()}] Consistent saturation in readings, check wiring. " f"ADC values: {self._data}. " + f"[{self.__repr__()}] Consistent saturation in readings, check wiring. ADC values: {self._data}." ) def calibrate( self, number_of_iterations: int = 2000, reset: bool = False, - data_callback: Optional[Callable[[], npt.NDArray[np.uint8]]] = None, + data_callback: Optional[Callable[[], npt.NDArray[np.uint16]]] = None, ) -> None: """ Perform a zeroing (calibration) routine for the load cell. @@ -293,7 +293,7 @@ def stop(self) -> None: if hasattr(self, "_smbus"): self._smbus.close() - def _read_compressed_strain(self) -> Any: + def _read_compressed_strain(self) -> npt.NDArray[np.uint16]: """ Read and unpack compressed strain data from the sensor. diff --git a/tests/test_sensors/test_loadcell.py b/tests/test_sensors/test_loadcell.py index bdb56a9c..93ccec63 100644 --- a/tests/test_sensors/test_loadcell.py +++ b/tests/test_sensors/test_loadcell.py @@ -75,6 +75,28 @@ def test_SRILoadcell_update(): assert np.array_equal(SRI._data, data) +def test_SRILoadcell_BrokenWire_high(): + # Test basic call execution + SRI = loadcell.DephyLoadcellAmplifier(calibration_matrix=DEFAULT_CAL_MATRIX) + for _i in range(SRI._num_broken_wire_pre_exception - 1): + SRI.update(data_callback=_read_data_high) + # Assert that the broken wire condition raises an exception after correct number of iterations + + with pytest.raises(loadcell.LoadcellBrokenWireDetectedException): + SRI.update(data_callback=_read_data_high) + + +def test_SRILoadcell_BrokenWire_low(): + # Test basic call execution + SRI = loadcell.DephyLoadcellAmplifier(calibration_matrix=DEFAULT_CAL_MATRIX) + for _i in range(SRI._num_broken_wire_pre_exception - 1): + SRI.update(data_callback=_read_data_low) + # Assert that the broken wire condition raises an exception after correct number of iterations + + with pytest.raises(loadcell.LoadcellBrokenWireDetectedException): + SRI.update(data_callback=_read_data_low) + + def test_SRILoadcell_calibrate(): # Test reset, else statement @@ -86,6 +108,20 @@ def _read_data() -> npt.NDArray[np.uint8]: return np.ones(shape=(1, 6)) +# Function to bypass _read_compressed_strain() with an array of ones with one broken wire pulled high +def _read_data_high() -> npt.NDArray[np.uint8]: + data = np.ones(shape=(1, 6)) + data[0, 2] = loadcell.DephyLoadcellAmplifier.ADC_RANGE + return data + + +# Function to bypass _read_compressed_strain() with an array of ones with one broken wire pulled low +def _read_data_low() -> npt.NDArray[np.uint8]: + data = np.ones(shape=(1, 6)) + data[0, 2] = 0 + return data + + # Function to bypass _read_compressed_strain() with random data def _read_random_data() -> npt.NDArray[np.uint8]: return np.random.randint(low=0, high=255, size=(1, 6), dtype=np.uint8)