From a102801235c4b453736b0797fab0b34ff411da73 Mon Sep 17 00:00:00 2001 From: lgomez Date: Fri, 22 Nov 2024 13:06:10 -0600 Subject: [PATCH] -Add crc8 for python --- .../interfaces/protocols/crc_protocol.py | 8 +- .../interfaces/protocols/test_crc_protocol.py | 141 +++++++++++++++++- 2 files changed, 147 insertions(+), 2 deletions(-) diff --git a/openc3/python/openc3/interfaces/protocols/crc_protocol.py b/openc3/python/openc3/interfaces/protocols/crc_protocol.py index a3d172d27c..01aea055bd 100644 --- a/openc3/python/openc3/interfaces/protocols/crc_protocol.py +++ b/openc3/python/openc3/interfaces/protocols/crc_protocol.py @@ -18,7 +18,7 @@ from openc3.interfaces.protocols.protocol import Protocol from openc3.accessors.binary_accessor import BinaryAccessor from openc3.utilities.logger import Logger -from openc3.utilities.crc import Crc16, Crc32, Crc64 +from openc3.utilities.crc import Crc8, Crc16, Crc32, Crc64 # Creates a CRC on write and verifies a CRC on read @@ -120,6 +120,12 @@ def __init__( if self.endianness == "BIG_ENDIAN": endianness = ">" match self.bit_size: + case 8: + self.pack = f"{endianness}B" + if len(args) == 0: + self.crc = Crc8() + else: + self.crc = Crc8(*args) case 16: self.pack = f"{endianness}H" if len(args) == 0: diff --git a/openc3/python/test/interfaces/protocols/test_crc_protocol.py b/openc3/python/test/interfaces/protocols/test_crc_protocol.py index 1d21d93330..de66d4f326 100644 --- a/openc3/python/test/interfaces/protocols/test_crc_protocol.py +++ b/openc3/python/test/interfaces/protocols/test_crc_protocol.py @@ -23,7 +23,7 @@ from openc3.interfaces.protocols.burst_protocol import BurstProtocol from openc3.packets.packet import Packet from openc3.streams.stream import Stream -from openc3.utilities.crc import Crc16, Crc32, Crc64 +from openc3.utilities.crc import Crc16, Crc32, Crc64, Crc8 class TestCrcProtocol(unittest.TestCase): @@ -347,6 +347,33 @@ def test_does_nothing_if_protocol_added_as_write(self): self.assertEqual(len(packet.buffer), 8) self.assertEqual(packet.buffer, TestCrcProtocol.buffer) + def test_reads_the_8_bit_crc_field_and_compares_to_the_crc(self): + self.interface.stream = TestCrcProtocol.CrcStream() + self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") + self.interface.add_protocol( + CrcProtocol, + [ + "CRC", # item name + "FALSE", # strip crc + "ERROR", # bad strategy + -8, # bit offset + 8, + ], # bit size + "READ_WRITE", + ) + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "PKT") + packet.append_item("DATA", 32, "UINT") + packet.append_item("CRC", 8, "UINT") + + TestCrcProtocol.buffer = b"\x00\x01\x02\x03" + crc = Crc8().calc(TestCrcProtocol.buffer) + TestCrcProtocol.buffer += struct.pack(">B", crc) # [crc].pack("n") + + packet = self.interface.read() + self.assertEqual(len(packet.buffer), 5) + self.assertEqual(packet.buffer, TestCrcProtocol.buffer) + def test_reads_the_16_bit_crc_field_and_compares_to_the_crc(self): self.interface.stream = TestCrcProtocol.CrcStream() self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") @@ -433,6 +460,39 @@ def test_reads_the_64_bit_crc_field_and_compares_to_the_crc(self): # context "with a specified CRC poly, seed, xor, and reflect" + def test_reads_the_8_bit_crc_field_and_compares_to_the_crc2(self): + self.interface.stream = TestCrcProtocol.CrcStream() + self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") + self.interface.add_protocol( + CrcProtocol, + [ + "CRC", # item name + "FALSE", # strip crc + "ERROR", # bad strategy + -8, # bit offset + 8, # bit size + "BIG_ENDIAN", # endianness + 0x39, # poly for CRC-8/DARC + 0x0, # seed + "TRUE", # xor + "TRUE", # reflect + ], + "READ_WRITE", + ) + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "PKT") + packet.append_item("DATA", 32, "UINT") + packet.append_item("CRC", 8, "UINT") + + TestCrcProtocol.buffer = b"\x00\x01\x02\x03" + crc8 = Crc8(0x39, 0, True, True) + crc = crc8.calc(TestCrcProtocol.buffer) + TestCrcProtocol.buffer += struct.pack(">B", crc) + + packet = self.interface.read() + self.assertEqual(len(packet.buffer), 5) + self.assertEqual(packet.buffer, TestCrcProtocol.buffer) + def test_reads_the_16_bit_crc_field_and_compares_to_the_crc2(self): self.interface.stream = TestCrcProtocol.CrcStream() self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") @@ -606,6 +666,33 @@ def test_disconnects_if_the_crc_does_not_match(self): stdout.getvalue(), ) + def test_can_strip_the_8_bit_crc_at_the_end(self): + self.interface.stream = TestCrcProtocol.CrcStream() + self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") + self.interface.add_protocol( + CrcProtocol, + [ + "CRC", # item name + "TRUE", # strip crc + "ERROR", # bad strategy + -8, # bit offset + 8, + ], # bit size + "READ_WRITE", + ) + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "PKT") + packet.append_item("DATA", 32, "UINT") + packet.append_item("CRC", 8, "UINT") + + TestCrcProtocol.buffer = b"\x00\x01\x02\x03" + crc = Crc8().calc(TestCrcProtocol.buffer) + TestCrcProtocol.buffer += struct.pack(">B", crc) + + packet = self.interface.read() + self.assertEqual(len(packet.buffer), 4) + self.assertEqual(packet.buffer, TestCrcProtocol.buffer[0:4]) + def test_can_strip_the_16_bit_crc_at_the_end(self): self.interface.stream = TestCrcProtocol.CrcStream() self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") @@ -768,6 +855,32 @@ def test_complains_if_the_item_does_not_exist(self): ): self.interface.write(packet) + def test_calculates_and_writes_the_8_bit_crc_item(self): + self.interface.stream = TestCrcProtocol.CrcStream() + self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") + self.interface.add_protocol( + CrcProtocol, + [ + "CRC", # item name + "FALSE", # strip crc + "ERROR", # bad strategy + -40, # bit offset + 8, + ], # bit size + "READ_WRITE", + ) + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "PKT") + packet.append_item("DATA", 32, "UINT") + packet.append_item("CRC", 8, "UINT") + packet.append_item("TRAILER", 32, "UINT") + packet.buffer = b"\x00\x01\x02\x03\x3F\x04\x05\x06\x07" + self.interface.write(packet) + buffer = b"\x00\x01\x02\x03" + buffer += struct.pack(">B", Crc8().calc(b"\x00\x01\x02\x03")) + buffer += b"\x04\x05\x06\x07" + self.assertEqual(TestCrcProtocol.buffer, buffer) + def test_calculates_and_writes_the_16_bit_crc_item(self): self.interface.stream = TestCrcProtocol.CrcStream() self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") @@ -852,6 +965,32 @@ def test_calculates_and_writes_the_64_bit_crc_item(self): buffer += b"\x04\x05\x06\x07" self.assertEqual(TestCrcProtocol.buffer, buffer) + def test_appends_the_8_bit_crc_to_the_end(self): + self.interface.stream = TestCrcProtocol.CrcStream() + self.interface.add_protocol(BurstProtocol, [], "READ_WRITE") + self.interface.add_protocol( + CrcProtocol, + [ + None, # item name None means append + "FALSE", # strip crc + "ERROR", # bad strategy + -8, # bit offset + 8, + ], # bit size + "READ_WRITE", + ) + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "PKT") + packet.append_item("DATA", 32, "UINT") + packet.append_item("CRC", 32, "UINT") + packet.append_item("TRAILER", 32, "UINT") + packet.buffer = b"\x00\x01\x02\x03\x00\x00\x00\x00\x04\x05\x06\x07" + buffer = packet.buffer + buffer += struct.pack(">B", Crc8().calc(packet.buffer)) + self.interface.write(packet) + self.assertEqual(len(TestCrcProtocol.buffer), 13) + self.assertEqual(TestCrcProtocol.buffer, buffer) + def test_appends_the_16_bit_crc_to_the_end(self): self.interface.stream = TestCrcProtocol.CrcStream() self.interface.add_protocol(BurstProtocol, [], "READ_WRITE")