Skip to content

277 load cell validity check #379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions opensourceleg/math/math.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"clamp_within_vector_range",
"from_twos_complement",
"to_twos_complement",
"Counter",
]


Expand Down Expand Up @@ -320,3 +321,27 @@ def from_twos_complement(value: int, bit_length: int) -> int:
return int(value - (2**bit_length))
else:
return int(value)


class Counter:
"""
A simple counter class that increments a counter each time the increment_counter argument is set true.
To reset the counter, call update with increment_counter set to false.

Author: Kevin Best, 9/25/2024
https://github.com/tkevinbest
"""

def __init__(self) -> None:
self._count: int = 0

def update(self, increment_counter: bool) -> None:
if increment_counter:
self._count += 1
else:
self._count = 0

@property
def current_count(self) -> int:
"""Returns the current count"""
return self._count
49 changes: 46 additions & 3 deletions opensourceleg/sensors/loadcell.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from smbus2 import SMBus

from opensourceleg.logging import LOGGER
from opensourceleg.math.math import Counter
from opensourceleg.sensors.adc import ADS131M0x
from opensourceleg.sensors.base import LoadcellBase

Expand All @@ -50,6 +51,25 @@ def __init__(self, message: str = "Load cell unresponsive.") -> None:
super().__init__(message)


class LoadcellBrokenWireDetectedException(Exception):
"""
Exception raised when a broken wire is detected in the load cell.
Indicated by saturated ADC values (0 or 4095).

Attributes:
message (str): Description of the error.
"""

def __init__(self, message: str = "Load cell broken wire detected.") -> None:
"""
Initialize the LoadcellBrokenWireDetectedException.

Args:
message (str, optional): Error message. Defaults to "Load cell broken wire detected.".
"""
super().__init__(message)


class DEPHY_AMPLIFIER_MEMORY_CHANNELS(int, Enum):
"""
Enumeration of memory channel addresses used by the load cell.
Expand Down Expand Up @@ -97,6 +117,7 @@ def __init__(
bus: int = 1,
i2c_address: int = 0x66,
offline: bool = False,
enable_diagnostics: bool = True,
) -> None:
"""
Initialize the Dephy loadcell amplifier.
Expand Down Expand Up @@ -145,6 +166,11 @@ def __init__(
self._zero_calibration_offset: npt.NDArray[np.double] = self._calibration_offset
self._is_calibrated: bool = False
self._is_streaming: bool = False
self._enable_diagnostics: bool = enable_diagnostics
self._data_potentially_invalid: bool = False
if self._enable_diagnostics:
self._diagnostics_counter = Counter()
self._num_broken_wire_pre_exception: int = 5

def start(self) -> None:
"""
Expand All @@ -169,7 +195,7 @@ def reset(self) -> None:
def update(
self,
calibration_offset: Optional[npt.NDArray[np.double]] = None,
data_callback: Optional[Callable[..., npt.NDArray[np.uint8]]] = None,
data_callback: Optional[Callable[..., npt.NDArray[np.uint16]]] = None,
) -> None:
"""
Query the load cell for the latest data and update internal state.
Expand All @@ -189,6 +215,9 @@ def update(
"""
data = data_callback() if data_callback else self._read_compressed_strain()

if self._enable_diagnostics:
self.check_data(data)

if calibration_offset is None:
calibration_offset = self._calibration_offset

Expand All @@ -198,11 +227,25 @@ def update(
# Process the data using the calibration matrix and subtract the offset.
self._data = np.transpose(a=self._calibration_matrix.dot(b=np.transpose(a=coupled_data))) - calibration_offset

def check_data(self, data: npt.NDArray[np.uint16]) -> None:
"""
Watches raw values from the load cell to try to catch broken wires.
Symptom is indicated by saturation at either max or min ADC values.
"""
ADC_saturated_high = np.any(data == self.ADC_RANGE) # Use np.any for NumPy arrays
ADC_saturated_low = np.any(data == 0) # Use np.any for NumPy arrays
concerning_data_found = bool(ADC_saturated_high or ADC_saturated_low)
self._diagnostics_counter.update(concerning_data_found)
if self._diagnostics_counter.current_count >= self._num_broken_wire_pre_exception:
raise LoadcellBrokenWireDetectedException(
f"[{self.__repr__()}] Consistent saturation in readings, check wiring. ADC values: {self._data}."
)

def calibrate(
self,
number_of_iterations: int = 2000,
reset: bool = False,
data_callback: Optional[Callable[[], npt.NDArray[np.uint8]]] = None,
data_callback: Optional[Callable[[], npt.NDArray[np.uint16]]] = None,
) -> None:
"""
Perform a zeroing (calibration) routine for the load cell.
Expand Down Expand Up @@ -258,7 +301,7 @@ def stop(self) -> None:
if hasattr(self, "_smbus"):
self._smbus.close()

def _read_compressed_strain(self) -> Any:
def _read_compressed_strain(self) -> npt.NDArray[np.uint16]:
"""
Read and unpack compressed strain data from the sensor.

Expand Down
36 changes: 36 additions & 0 deletions tests/test_sensors/test_loadcell.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ def test_SRILoadcell_update():
assert np.array_equal(SRI._data, data)


def test_SRILoadcell_BrokenWire_high():
# Test basic call execution
SRI = loadcell.DephyLoadcellAmplifier(calibration_matrix=DEFAULT_CAL_MATRIX)
for _i in range(SRI._num_broken_wire_pre_exception - 1):
SRI.update(data_callback=_read_data_high)
# Assert that the broken wire condition raises an exception after correct number of iterations

with pytest.raises(loadcell.LoadcellBrokenWireDetectedException):
SRI.update(data_callback=_read_data_high)


def test_SRILoadcell_BrokenWire_low():
# Test basic call execution
SRI = loadcell.DephyLoadcellAmplifier(calibration_matrix=DEFAULT_CAL_MATRIX)
for _i in range(SRI._num_broken_wire_pre_exception - 1):
SRI.update(data_callback=_read_data_low)
# Assert that the broken wire condition raises an exception after correct number of iterations

with pytest.raises(loadcell.LoadcellBrokenWireDetectedException):
SRI.update(data_callback=_read_data_low)


def test_SRILoadcell_calibrate():
# Test reset, else statement

Expand All @@ -86,6 +108,20 @@ def _read_data() -> npt.NDArray[np.uint8]:
return np.ones(shape=(1, 6))


# Function to bypass _read_compressed_strain() with an array of ones with one broken wire pulled high
def _read_data_high() -> npt.NDArray[np.uint8]:
data = np.ones(shape=(1, 6))
data[0, 2] = loadcell.DephyLoadcellAmplifier.ADC_RANGE
return data


# Function to bypass _read_compressed_strain() with an array of ones with one broken wire pulled low
def _read_data_low() -> npt.NDArray[np.uint8]:
data = np.ones(shape=(1, 6))
data[0, 2] = 0
return data


# Function to bypass _read_compressed_strain() with random data
def _read_random_data() -> npt.NDArray[np.uint8]:
return np.random.randint(low=0, high=255, size=(1, 6), dtype=np.uint8)
Expand Down