From 13d3cd232cb9953df7684f449973cc8e03c31941 Mon Sep 17 00:00:00 2001 From: Jason Thomas Date: Sat, 8 Feb 2025 11:25:46 -0700 Subject: [PATCH] Fix python unit tests --- .github/workflows/api_tests.yml | 8 +- .../scripts/run_script.py | 11 +- .../scripts/running_script.py | 25 +- .../test/scripts/__init__.py | 0 openc3-ruby/Dockerfile-ubi | 5 +- openc3/python/openc3/packets/packet.py | 5 +- openc3/python/openc3/packets/telemetry.py | 190 +----------- openc3/python/test/api/test_interface_api.py | 10 +- openc3/python/test/packets/test_telemetry.py | 272 ++++++++++++++++++ openc3/spec/packets/telemetry_spec.rb | 59 ++++ 10 files changed, 369 insertions(+), 216 deletions(-) create mode 100644 openc3-cosmos-script-runner-api/test/scripts/__init__.py create mode 100644 openc3/python/test/packets/test_telemetry.py diff --git a/.github/workflows/api_tests.yml b/.github/workflows/api_tests.yml index 90a70bc448..48e10d91b6 100644 --- a/.github/workflows/api_tests.yml +++ b/.github/workflows/api_tests.yml @@ -112,13 +112,13 @@ jobs: working-directory: openc3/python - name: Lint with ruff run: | - poetry run ruff --config=../openc3/python/pyproject.toml --output-format=github scripts/*.py - working-directory: openc3-cosmos-script-runner-api + poetry run ruff check --output-format=github ../../openc3-cosmos-script-runner-api/scripts + working-directory: openc3/python - name: Run unit tests run: | - poetry run coverage run -m pytest ./test/ + poetry run coverage run -m pytest ../../openc3-cosmos-script-runner-api/test/ poetry run coverage xml -i - working-directory: openc3-cosmos-script-runner-api + working-directory: openc3/python - uses: codecov/codecov-action@v5 with: working-directory: openc3/python diff --git a/openc3-cosmos-script-runner-api/scripts/run_script.py b/openc3-cosmos-script-runner-api/scripts/run_script.py index 65a22fed35..7e33dd0784 100644 --- a/openc3-cosmos-script-runner-api/scripts/run_script.py +++ b/openc3-cosmos-script-runner-api/scripts/run_script.py @@ -18,20 +18,19 @@ import time import json import sys +import traceback from datetime import datetime, timezone from openc3.script import get_overrides from openc3.utilities.bucket import Bucket from openc3.utilities.store import Store, EphemeralStore from openc3.utilities.extract import convert_to_value from openc3.utilities.logger import Logger -from openc3.environment import * -import traceback +from openc3.environment import OPENC3_CONFIG_BUCKET +from running_script import RunningScript, running_script_anycable_publish start_time = time.time() -from running_script import RunningScript, running_script_anycable_publish - -# # Load the bucket client code to ensure we authenticate outside ENV vars +# Load the bucket client code to ensure we authenticate outside ENV vars Bucket.getClient() del os.environ["OPENC3_BUCKET_USERNAME"] @@ -39,7 +38,7 @@ os.unsetenv("OPENC3_BUCKET_USERNAME") os.unsetenv("OPENC3_BUCKET_PASSWORD") -# # Preload Store and remove Redis secrets from ENV +# Preload Store and remove Redis secrets from ENV Store.instance() EphemeralStore.instance() diff --git a/openc3-cosmos-script-runner-api/scripts/running_script.py b/openc3-cosmos-script-runner-api/scripts/running_script.py index e1dad9eaa2..68fbcb7568 100644 --- a/openc3-cosmos-script-runner-api/scripts/running_script.py +++ b/openc3-cosmos-script-runner-api/scripts/running_script.py @@ -1214,18 +1214,19 @@ def redirect_io(self): Logger.stdout = True Logger.level = Logger.INFO - def output_thread(self): - RunningScript.cancel_output = False - RunningScript.output_sleeper = Sleeper() - while True: - if RunningScript.cancel_output: - break - if (time.time() - self.output_time) > 5.0: - self.handle_output_io() - if RunningScript.cancel_output: - break - if RunningScript.output_sleeper.sleep(1.0): - break + # TODO: This is defined on 206 ... so this is not called + # def output_thread(self): + # RunningScript.cancel_output = False + # RunningScript.output_sleeper = Sleeper() + # while True: + # if RunningScript.cancel_output: + # break + # if (time.time() - self.output_time) > 5.0: + # self.handle_output_io() + # if RunningScript.cancel_output: + # break + # if RunningScript.output_sleeper.sleep(1.0): + # break openc3.script.RUNNING_SCRIPT = RunningScript diff --git a/openc3-cosmos-script-runner-api/test/scripts/__init__.py b/openc3-cosmos-script-runner-api/test/scripts/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openc3-ruby/Dockerfile-ubi b/openc3-ruby/Dockerfile-ubi index 0a97eee6a4..dcf7762ab7 100644 --- a/openc3-ruby/Dockerfile-ubi +++ b/openc3-ruby/Dockerfile-ubi @@ -80,7 +80,10 @@ RUN cd / \ && python3 -m venv /openc3/venv \ && source /openc3/venv/bin/activate \ && pip3 config --global set global.index $PYPI_URL/pypi \ - && pip3 config --global set global.index-url $PYPI_URL/simple + && pip3 config --global set global.index-url $PYPI_URL/simple \ + && pip3 install --upgrade pip setuptools \ + && pip3 install poetry \ + && pip3 install poetry-plugin-export # Begin CVE fix CVE-2023-36617 (update uri 0.12.1 to version 0.12.2 or greater) diff --git a/openc3/python/openc3/packets/packet.py b/openc3/python/openc3/packets/packet.py index a82e2231b5..1506ef21c1 100644 --- a/openc3/python/openc3/packets/packet.py +++ b/openc3/python/openc3/packets/packet.py @@ -277,7 +277,10 @@ def buffer(self, buffer): with self.synchronize(): try: self.internal_buffer_equals(buffer) - except Exception: + # Catch and re-raise the TypeError thrown by internal_buffer_equals + except TypeError as error: + raise error + except ValueError: Logger.error( f"{self.target_name} {self.packet_name} buffer ({type(buffer)}) received with actual packet length of {len(buffer)} but defined length of {self.defined_length}" ) diff --git a/openc3/python/openc3/packets/telemetry.py b/openc3/python/openc3/packets/telemetry.py index c9d74542ed..4c6dc62d1f 100644 --- a/openc3/python/openc3/packets/telemetry.py +++ b/openc3/python/openc3/packets/telemetry.py @@ -41,8 +41,8 @@ def warnings(self): # @return [Array] The command target names (excluding UNKNOWN) def target_names(self): - result = self.config.telemetry.keys() - result.delete("UNKNOWN") + result = list(self.config.telemetry.keys()) + result.remove("UNKNOWN") result.sort() return result @@ -70,158 +70,6 @@ def packet(self, target_name, packet_name): raise RuntimeError(f"Telemetry packet '{upcase_target_name} {upcase_packet_name}' does not exist") return packet - # # @param target_name (see #packet) - # # @param packet_name [String] The packet name. 'LATEST' can also be given - # # to specify the last received (or defined if no packets have been - # # received) packet within the given target that contains the - # # item_name. - # # @param item_name [String] The item name - # # @return [Packet, PacketItem] The packet and the packet item - # def packet_and_item(target_name, packet_name, item_name): - # upcase_packet_name = str(packet_name).upper() - # if upcase_packet_name == "LATEST": - # return_packet = newest_packet(target_name, item_name) - # else: - # return_packet = packet(target_name, packet_name) - # item = return_packet.get_item(item_name) - # return [return_packet, item] - - # # Return a telemetry value from a packet. - # # - # # @param target_name (see #packet_and_item) - # # @param packet_name (see #packet_and_item) - # # @param item_name (see #packet_and_item) - # # @param value_type [Symbol] How to convert the item before returning. - # # Must be one of {Packet::VALUE_TYPES} - # # @return The value. :FORMATTED and :WITH_UNITS values are always returned - # # as Strings. :RAW values will match their data_type. :CONVERTED values - # # can be any type. - # def value(target_name, packet_name, item_name, value_type = 'CONVERTED'): - # packet, _ = packet_and_item(target_name, packet_name, item_name) # Handles LATEST - # return packet.read(item_name, value_type) - - # # Reads the specified list of items and returns their values and limits - # # state. - # # - # # @param item_array [Array] An array - # # consisting of [target name, packet name, item name] - # # @param value_types [Symbol|Array] How to convert the items before - # # returning. A single symbol of {Packet::VALUE_TYPES} - # # can be passed which will convert all items the same way. Or - # # an array of symbols can be passed to control how each item is - # # converted. - # # @return [Array, Array, Array] The first array contains the item values and the - # # second their limits state, and the third their limits settings which includes - # # the red, yellow, and green (if given) limits values. - # def values_and_limits_states(item_array, value_types = 'CONVERTED'): - # items = [] - - # # Verify item_array is a nested array - # raise ValueError(f"item_array must be a nested array consisting of [[tgt,pkt,item],[tgt,pkt,item],...]") if not Array === item_array[0] - - # states = [] - # settings = [] - # limits_set = System.limits_set() - - # if (Array === value_types) and len(item_array) != len(value_types): - # raise ValueError(f"Passed {len(item_array)} items but only {len(value_types)} value types") - - # value_type = value_types if not Array === value_types - # len(item_array).times do |index| - # entry = item_array[index] - # target_name = entry[0] - # packet_name = entry[1] - # item_name = entry[2] - # if Array === value_types: - # value_type = value_types[index] - - # packet, item = packet_and_item(target_name, packet_name, item_name) # Handles LATEST - # items.append(packet.read(item_name, value_type)) - # limits = item.limits - # states.append(limits.state) - # limits_values = limits.values - # if limits_values: - # limits_settings = limits_values[limits_set] - # else: - # limits_settings = None - # settings.append(limits_settings) - - # return [items, states, settings] - - # # @param target_name (see #packet) - # # @param packet_name (see #packet) - # # @return [Array] The telemetry items for the given target and packet name - # def items(target_name, packet_name): - # return packet(target_name, packet_name).sorted_items - - # # @param target_name (see #packet) - # # @param packet_name (see #packet) The packet name. LATEST is supported. - # # @return [Array] The telemetry item names for the given target and packet name - # def item_names(target_name, packet_name): - # if LATEST_PACKET_NAME.casecmp(packet_name).zero?: - # target_upmatch = str(target_name).upper(): - # target_latest_data = self.config.latest_data[target_upcase] - # raise "Telemetry Target '{target_upcase}' does not exist" if not target_latest_data - - # item_names = target_latest_data.keys - # else: - # tlm_packet = packet(target_name, packet_name) - # item_names = [] - # tlm_packet.sorted_items.each { |item| item_names.append(item.name }) - # item_names - - # # Set a telemetry value in a packet. - # # - # # @param target_name (see #packet_and_item) - # # @param packet_name (see #packet_and_item) - # # @param item_name (see #packet_and_item) - # # @param value The value to set in the packet - # # @param value_type (see #tlm) - # def set_value(target_name, packet_name, item_name, value, value_type = 'CONVERTED'): - # packet, _ = packet_and_item(target_name, packet_name, item_name) - # packet.write(item_name, value, value_type) - - # # @param target_name (see #packet_and_item) - # # @param item_name (see #packet_and_item) - # # @return [Array] The latest (most recently arrived) packets with - # # the specified target and item. - # def latest_packets(target_name, item_name): - # target_upmatch = str(target_name).upper(): - # item_upmatch = str(item_name).upper(): - # target_latest_data = self.config.latest_data[target_upcase] - # raise "Telemetry target '{target_upcase}' does not exist" if not target_latest_data - - # packets = self.config.latest_data[target_upcase][item_upcase] - # raise "Telemetry item '{target_upcase} {LATEST_PACKET_NAME} {item_upcase}' does not exist" if not packets - - # return packets - - # # @param target_name (see #packet_and_item) - # # @param item_name (see #packet_and_item) - # # @return [Packet] The packet with the most recent timestamp that contains - # # the specified target and item. - # def newest_packet(target_name, item_name): - # # Handle LATEST_PACKET_NAME - Lookup packets for this target/item - # packets = latest_packets(target_name, item_name) - - # # Find packet with newest timestamp - # newest_packet = None - # newest_received_time = None - # for packet in packets: - # received_time = packet.received_time - # if newest_received_time: - # # See if the received time from this packet is newer. - # # Having the >= makes this method return the last defined packet - # # whether the timestamps are both nil or both equal. - # if received_time and received_time >= newest_received_time: - # newest_packet = packet - # newest_received_time = newest_packet.received_time - # else: - # # No received time yet so take this packet - # newest_packet = packet - # newest_received_time = newest_packet.received_time - # return newest_packet - # Identifies an unknown buffer of data as a defined packet and sets the # packet's data to the given buffer. Identifying a packet uses the fields # marked as ID_ITEM to identify if the buffer passed represents the @@ -263,10 +111,10 @@ def identify(self, packet_data, target_names=None): # No telemetry for this target continue - target = self.system.targets[target_name] + target = self.system.targets.get(target_name) if target and target.tlm_unique_id_mode: # Iterate through the packets and see if any represent the buffer - for _, packet in target_packets: + for _, packet in target_packets.items(): if packet.identify(packet_data): return packet else: @@ -339,36 +187,6 @@ def reset(self): for _, packet in packets.items(): packet.reset() - # # Returns an array with a "TARGET_NAME PACKET_NAME ITEM_NAME" string for every item in the system - # def all_item_strings(include_hidden = False, splash = None): - # strings = [] - # tnames = target_names() - # total = len(tnames) float() - # tnames.each_with_index do |target_name, index| - # if splash: - # splash.message = "Processing {target_name} telemetry" - # splash.progress = index / total - - # # Note: System only has declared target structures but telemetry may have more - # system_target = System.targets[target_name] - # if system_target: - # ignored_items = system_target.ignored_items - # else: - # ignored_items = [] - - # for packet_name, packet in packets(target_name): - # # We don't audit against hidden or disabled packets - # if !include_hidden and (packet.hidden or packet.disabled): - # next - - # packet.items.each_key do |item_name| - # # Skip ignored items - # if !include_hidden and ignored_items.include? item_name: - # next - - # strings.append("{target_name} {packet_name} {item_name}") - # return strings - # @return [Hash{String=>Hash{String=>Packet}}] Hash of all the telemetry # packets keyed by the target name. The value is another hash keyed by the # packet name returning the packet. diff --git a/openc3/python/test/api/test_interface_api.py b/openc3/python/test/api/test_interface_api.py index c8509ee06e..bba7427a24 100644 --- a/openc3/python/test/api/test_interface_api.py +++ b/openc3/python/test/api/test_interface_api.py @@ -47,7 +47,7 @@ def disconnect(self): pass def read_interface(self): - time.sleep(0.05) + time.sleep(0.01) return (b"\x01\x02\x03\x04", None) def interface_cmd(self, cmd_name, *cmd_params): @@ -77,7 +77,7 @@ def build(): self.im = InterfaceMicroservice("DEFAULT__INTERFACE__INST_INT") self.im_thread = threading.Thread(target=self.im.run) self.im_thread.start() - time.sleep(0.01) # Allow the thread to run + time.sleep(0.02) # Allow the thread to run def tearDown(self): self.im.shutdown() @@ -88,7 +88,7 @@ def test_returns_interface_hash(self): self.assertEqual(type(interface), dict) self.assertEqual(interface["name"], "INST_INT") # Verify it also includes the status - self.assertEqual(interface["state"], "ATTEMPTING") + self.assertEqual(interface["state"], "CONNECTED") self.assertEqual(interface["clients"], 0) def test_returns_all_interface_names(self): @@ -99,8 +99,6 @@ def test_returns_all_interface_names(self): self.assertEqual(get_interface_names(), ["INST_INT", "INT1", "INT2"]) def test_connects_the_interface(self): - self.assertEqual(get_interface("INST_INT")["state"], "ATTEMPTING") - time.sleep(0.1) self.assertEqual(get_interface("INST_INT")["state"], "CONNECTED") disconnect_interface("INST_INT") time.sleep(0.1) @@ -134,7 +132,7 @@ def test_should_start_and_stop_raw_logging_on_the_interface(self): def test_gets_interface_name_and_all_info(self): info = get_all_interface_info() self.assertEqual(info[0][0], "INST_INT") - self.assertEqual(info[0][1], "ATTEMPTING") + self.assertEqual(info[0][1], "CONNECTED") def test_successfully_maps_a_target_to_an_interface(self): TargetModel(name="INST", scope="DEFAULT").create() diff --git a/openc3/python/test/packets/test_telemetry.py b/openc3/python/test/packets/test_telemetry.py new file mode 100644 index 0000000000..9874e24382 --- /dev/null +++ b/openc3/python/test/packets/test_telemetry.py @@ -0,0 +1,272 @@ +# Copyright 2025 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +import os +import tempfile +import unittest +from unittest.mock import * +from test.test_helper import mock_redis, setup_system, capture_io +from openc3.system.target import Target +from openc3.system.system import System +from openc3.packets.packet import Packet +from openc3.packets.packet_config import PacketConfig +from openc3.packets.telemetry import Telemetry + + +class TestTelemetry(unittest.TestCase): + def setUp(self): + mock_redis(self) + setup_system() + + tf = tempfile.NamedTemporaryFile(mode="w") + tf.write("# This is a comment\n") + tf.write("#\n") + tf.write('TELEMETRY tgt1 pkt1 LITTLE_ENDIAN "TGT1 PKT1 Description"\n') + tf.write(' APPEND_ID_ITEM item1 8 UINT 1 "Item1"\n') + tf.write(" LIMITS DEFAULT 1 ENABLED 1 2 4 5\n") + tf.write(' APPEND_ITEM item2 8 UINT "Item2"\n') + tf.write(" LIMITS DEFAULT 1 ENABLED 1 2 4 5\n") + tf.write(' APPEND_ITEM item3 8 UINT "Item3"\n') + tf.write(" POLY_READ_CONVERSION 0 2\n") + tf.write(' APPEND_ITEM item4 8 UINT "Item4"\n') + tf.write(" POLY_READ_CONVERSION 0 2\n") + tf.write('TELEMETRY tgt1 pkt2 LITTLE_ENDIAN "TGT1 PKT2 Description"\n') + tf.write(' APPEND_ID_ITEM item1 8 UINT 2 "Item1"\n') + tf.write(' APPEND_ITEM item2 8 UINT "Item2"\n') + tf.write('TELEMETRY tgt2 pkt1 LITTLE_ENDIAN "TGT2 PKT1 Description"\n') + tf.write(' APPEND_ID_ITEM item1 8 UINT 3 "Item1"\n') + tf.write(' APPEND_ITEM item2 8 UINT "Item2"\n') + tf.seek(0) + + # Verify initially that everything is empty + pc = PacketConfig() + pc.process_file(tf.name, "SYSTEM") + System.targets["TGT1"] = Target("TGT1", os.getcwd()) + System.targets["TGT2"] = Target("TGT2", os.getcwd()) + self.tlm = Telemetry(pc, System) + tf.close() + + def test_has_no_warnings(self): + self.assertEqual(Telemetry(PacketConfig(), System).warnings(), []) + + def test_returns_an_empty_array_if_no_targets(self): + self.assertEqual(Telemetry(PacketConfig(), System).target_names(), []) + + def test_returns_all_target_names(self): + self.assertEqual(self.tlm.target_names(), ["TGT1", "TGT2"]) + + def test_packets_complains_about_non_existent_targets(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry target 'TGTX' does not exist"): + self.tlm.packets("tgtX") + + def test_packets_returns_all_packets_target_tgt1(self): + pkts = self.tlm.packets("TGT1") + self.assertEqual(len(pkts), 2) + self.assertIn("PKT1", pkts.keys()) + self.assertIn("PKT2", pkts.keys()) + + def test_packets_returns_all_packets_target_tgt2(self): + pkts = self.tlm.packets("TGT2") + self.assertEqual(len(pkts), 1) + self.assertIn("PKT1", pkts.keys()) + + def test_packet_complains_about_non_existent_targets(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry target 'TGTX' does not exist"): + self.tlm.packet("tgtX", "pkt1") + + def test_packet_complains_about_non_existent_packets(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry packet 'TGT1 PKTX' does not exist"): + self.tlm.packet("TGT1", "PKTX") + + def test_packet_complains_about_the_latest_packet(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry packet 'TGT1 LATEST' does not exist"): + self.tlm.packet("TGT1", "LATEST") + + def test_packet_returns_the_specified_packet(self): + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.target_name, "TGT1") + self.assertEqual(pkt.packet_name, "PKT1") + + def test_identify_returns_None_with_a_None_buffer(self): + self.assertIsNone(self.tlm.identify_and_set_buffer(None)) + + def test_identify_only_checks_the_targets_given(self): + buffer = b"\x01\x02\x03\x04" + self.tlm.identify_and_set_buffer(buffer, ["TGT1"]) + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + + def test_identify_works_in_unique_id_mode_and_not(self): + System.targets["TGT1"] = Target("TGT1", os.getcwd()) + target = System.targets["TGT1"] + buffer = b"\x01\x02\x03\x04" + target.tlm_unique_id_mode = False + pkt = self.tlm.identify_and_set_buffer(buffer, ["TGT1"]) + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + buffer = b"\x01\x02\x01\x02" + target.tlm_unique_id_mode = True + self.tlm.identify_and_set_buffer(buffer, ["TGT1"]) + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 2.0) + self.assertEqual(pkt.read("item4"), 4.0) + target.tlm_unique_id_mode = False + + def test_identify_returns_None_with_unknown_targets_given(self): + buffer = b"\x01\x02\x03\x04" + self.assertIsNone(self.tlm.identify_and_set_buffer(buffer, ["TGTX"])) + + def test_identify_identify_logs_an_invalid_sized_buffer(self): + for stdout in capture_io(): + buffer = b"\x01\x02\x03\x04\x05" + self.tlm.identify_and_set_buffer(buffer) + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + self.assertIn( + "TGT1 PKT1 buffer () received with actual packet length of 5 but defined length of 4", + stdout.getvalue(), + ) + + def test_identify_identifies_tgt1_pkt1(self): + buffer = b"\x01\x02\x03\x04" + self.tlm.identify_and_set_buffer(buffer) + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + + def test_identify_identifies_tgt1_pkt2(self): + buffer = b"\x02\x02" + self.tlm.identify_and_set_buffer(buffer) + pkt = self.tlm.packet("TGT1", "PKT2") + self.assertEqual(pkt.read("item1"), 2) + self.assertEqual(pkt.read("item2"), 2) + + def test_identify_identifies_tgt2_pkt1(self): + buffer = b"\x03\x02" + self.tlm.identify_and_set_buffer(buffer) + pkt = self.tlm.packet("TGT2", "PKT1") + self.assertEqual(pkt.read("item1"), 3) + self.assertEqual(pkt.read("item2"), 2) + + def test_identify_and_define_identifies_packets(self): + unknown = Packet(None, None) + unknown.buffer = b"\x01\x02\x03\x04" + pkt = self.tlm.identify_and_define_packet(unknown) + self.assertEqual(pkt.target_name, "TGT1") + self.assertEqual(pkt.packet_name, "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + + def test_identify_and_define_returns_None_for_unidentified(self): + unknown = Packet(None, None) + unknown.buffer = b"\xFF\xFF\xFF\xFF" + pkt = self.tlm.identify_and_define_packet(unknown) + self.assertIsNone(pkt) + + def test_identify_and_define_defines_packets(self): + unknown = Packet("TGT1", "PKT1") + unknown.buffer = b"\x01\x02\x03\x04" + pkt = self.tlm.identify_and_define_packet(unknown) + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + # It simply returns the packet if it is already identified and defined + pkt2 = self.tlm.identify_and_define_packet(pkt) + self.assertEqual(pkt2, pkt) + + def test_identify_and_define_returns_None_for_undefined(self): + unknown = Packet("TGTX", "PKTX") + unknown.buffer = b"\x01\x02\x03\x04" + pkt = self.tlm.identify_and_define_packet(unknown) + self.assertIsNone(pkt) + + def test_update_complains_about_non_existent_targets(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry target 'TGTX' does not exist"): + self.tlm.update("TGTX", "PKT1", b"\x00") + + def test_update_complains_about_non_existent_packets(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry packet 'TGT1 PKTX' does not exist"): + self.tlm.update("TGT1", "PKTX", b"\x00") + + def test_update_complains_about_the_latest_packet(self): + with self.assertRaisesRegex(RuntimeError, f"Telemetry packet 'TGT1 LATEST' does not exist"): + self.tlm.update("TGT1", "LATEST", b"\x00") + + def test_update_complains_with_a_None_buffer(self): + with self.assertRaisesRegex(TypeError, f"Buffer class is NoneType but must be bytearray"): + self.tlm.update("TGT1", "PKT1", None) + + def test_update_update_logs_an_invalid_sized_buffer(self): + for stdout in capture_io(): + buffer = b"\x01\x02\x03\x04\x05" + self.tlm.update("TGT1", "PKT1", buffer) + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + self.assertIn( + "TGT1 PKT1 buffer () received with actual packet length of 5 but defined length of 4", + stdout.getvalue(), + ) + + def test_update_updates_a_packet_with_the_given_data(self): + self.tlm.update("TGT1", "PKT1", b"\x01\x02\x03\x04") + pkt = self.tlm.packet("TGT1", "PKT1") + self.assertEqual(pkt.read("item1"), 1) + self.assertEqual(pkt.read("item2"), 2) + self.assertEqual(pkt.read("item3"), 6.0) + self.assertEqual(pkt.read("item4"), 8.0) + + def test_assigns_a_callback_to_each_packet(self): + callback = Mock() + self.tlm.set_limits_change_callback(callback) + self.tlm.update("TGT1", "PKT1", b"\x01\x02\x03\x04") + self.tlm.update("TGT1", "PKT2", b"\x05\x06") + self.tlm.update("TGT2", "PKT1", b"\x07\x08") + self.tlm.packet("TGT1", "PKT1").check_limits() + self.tlm.packet("TGT1", "PKT2").check_limits() + self.tlm.packet("TGT2", "PKT1").check_limits() + callback.assert_called() + + def test_reset_resets_all_packets(self): + for _name, pkt in self.tlm.packets("TGT1").items(): + pkt.received_count = 1 + for _name, pkt in self.tlm.packets("TGT2").items(): + pkt.received_count = 1 + self.tlm.reset() + for _name, pkt in self.tlm.packets("TGT1").items(): + self.assertEqual(pkt.received_count, 0) + for _name, pkt in self.tlm.packets("TGT2").items(): + self.assertEqual(pkt.received_count, 0) + + def test_all_returns_all_packets(self): + self.assertEqual(list(self.tlm.all().keys()), ["UNKNOWN", "TGT1", "TGT2"]) diff --git a/openc3/spec/packets/telemetry_spec.rb b/openc3/spec/packets/telemetry_spec.rb index 98f9b53661..eea892da96 100644 --- a/openc3/spec/packets/telemetry_spec.rb +++ b/openc3/spec/packets/telemetry_spec.rb @@ -348,6 +348,47 @@ module OpenC3 end end + describe "identify_and_define_packet" do + it "identifies packets" do + unknown = Packet.new(nil, nil) + unknown.buffer = "\x01\x02\x03\x04" + pkt = @tlm.identify_and_define_packet(unknown) + expect(pkt.target_name).to eql "TGT1" + expect(pkt.packet_name).to eql "PKT1" + expect(pkt.read("item1")).to eql 1 + expect(pkt.read("item2")).to eql 2 + expect(pkt.read("item3")).to eql 6.0 + expect(pkt.read("item4")).to eql 8.0 + end + + it "returns nil for unidentified" do + unknown = Packet.new(nil, nil) + unknown.buffer = "\xFF\xFF\xFF\xFF" + pkt = @tlm.identify_and_define_packet(unknown) + expect(pkt).to be_nil + end + + it "defines packets" do + unknown = Packet.new("TGT1", "PKT1") + unknown.buffer = "\x01\x02\x03\x04" + pkt = @tlm.identify_and_define_packet(unknown) + expect(pkt.read("item1")).to eql 1 + expect(pkt.read("item2")).to eql 2 + expect(pkt.read("item3")).to eql 6.0 + expect(pkt.read("item4")).to eql 8.0 + # It simply returns the packet if it is already identified and defined + pkt2 = @tlm.identify_and_define_packet(pkt) + expect(pkt2).to eql pkt + end + + it "returns nil for undefined packets" do + unknown = Packet.new("TGTX", "PKTX") + unknown.buffer = "\x01\x02\x03\x04" + pkt = @tlm.identify_and_define_packet(unknown) + expect(pkt).to be_nil + end + end + describe "update!" do it "complains about non-existent targets" do expect { @tlm.update!("TGTX", "PKT1", "\x00") }.to raise_error(RuntimeError, "Telemetry target 'TGTX' does not exist") @@ -547,6 +588,24 @@ module OpenC3 end end + describe "reset" do + it "resets all packets" do + @tlm.packets("TGT1").each do |name, pkt| + pkt.received_count = 1 + end + @tlm.packets("TGT2").each do |name, pkt| + pkt.received_count = 1 + end + @tlm.reset() + @tlm.packets("TGT1").each do |name, pkt| + expect(pkt.received_count).to eql 0 + end + @tlm.packets("TGT2").each do |name, pkt| + expect(pkt.received_count).to eql 0 + end + end + end + describe "all_item_strings" do it "returns hidden TGT,PKT,ITEMs in the system" do @tlm.packet("TGT1", "PKT1").hidden = true