diff --git a/openc3/python/openc3/accessors/template_accessor.py b/openc3/python/openc3/accessors/template_accessor.py new file mode 100644 index 0000000000..c229d7f9f9 --- /dev/null +++ b/openc3/python/openc3/accessors/template_accessor.py @@ -0,0 +1,155 @@ +# Copyright 2024 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +from openc3.accessors.accessor import Accessor +import re + + +class TemplateAccessor(Accessor): + def __init__(self, packet, left_char="<", right_char=">"): + super().__init__(packet) + self.left_char = left_char + self.right_char = right_char + self.configured = False + + def configure(self): + if self.configured: + return + + escaped_left_char = self.left_char + if self.left_char == "(": + escaped_left_char = f"\\{self.left_char}" + escaped_right_char = self.right_char + if self.right_char == ")": + escaped_right_char = f"\\{self.right_char}" + + # Convert the template into a Regexp for reading each item + template = self.packet.template[:] + template_items = re.compile(f"{escaped_left_char}.*?{escaped_right_char}", re.X).findall(template.decode()) + escaped_read_template = re.escape(template.decode()) + + self.item_keys = [] + for item in template_items: + self.item_keys.append(item[1:-1]) + # If they're using parens we have to escape them + # since we're working with the already escaped template + if self.left_char == "(": + item = f"\{item}" + if self.right_char == ")": + item = f"{item[0:-1]}\)" + escaped_read_template = escaped_read_template.replace(item, "(.*)") + self.read_regexp = re.compile(escaped_read_template, re.X) + self.configured = True + + def read_item(self, item, buffer): + if item.data_type == "DERIVED": + return None + self.configure() + + # Scan the response for all the variables in brackets + values = self.read_regexp.match(buffer.decode()) + if values is not None: + values = values.groups() + if values is None or (len(values) != len(self.item_keys)): + num_items = 0 + if values is not None: + num_items = len(values) + raise RuntimeError( + f"Unexpected number of items found in buffer= {num_items}, Expected= {len(self.item_keys)}" + ) + else: + for i, value in enumerate(values): + item_key = self.item_keys[i] + if item_key == item.key: + return Accessor.convert_to_type(value, item) + + raise RuntimeError(f"Response does not include key {item.key}: {buffer}") + + def read_items(self, items, buffer): + result = {} + self.configure() + + # Scan the response for all the variables in brackets + values = self.read_regexp.match(buffer.decode()) + if values is not None: + values = values.groups() + if values is None or (len(values) != len(self.item_keys)): + num_items = 0 + if values is not None: + num_items = len(values) + raise RuntimeError( + f"Unexpected number of items found in buffer= {num_items}, Expected= {len(self.item_keys)}" + ) + else: + for item in items: + if item.data_type == "DERIVED": + result[item.name] = None + continue + try: + index = self.item_keys.index(item.key) + result[item.name] = Accessor.convert_to_type(values[index], item) + except ValueError: + raise RuntimeError(f"Unknown item with key {item.key} requested") + + return result + + def write_item(self, item, value, buffer): + if item.data_type == "DERIVED": + return None + self.configure() + + updated_buffer = buffer.decode().replace(f"{self.left_char}{item.key}{self.right_char}", str(value)).encode() + + if buffer == updated_buffer: + raise RuntimeError(f"Key {item.key} not found in template") + buffer[0:] = updated_buffer + return value + + def write_items(self, items, values, buffer): + self.configure() + for index, item in enumerate(items): + if item.data_type == "DERIVED": + continue + updated_buffer = ( + buffer.decode().replace(f"{self.left_char}{item.key}{self.right_char}", str(values[index])).encode() + ) + + if buffer == updated_buffer: + raise RuntimeError(f"Key {item.key} not found in template") + buffer[0:] = updated_buffer + return values + + # If this is set it will enforce that buffer data is encoded + # in a specific encoding + def enforce_encoding(self): + return None + + # This affects whether the Packet class enforces the buffer + # length at all. Set to false to remove any correlation between + # buffer length and defined sizes of items in COSMOS + def enforce_length(self): + return False + + # This sets the short_buffer_allowed flag in the Packet class + # which allows packets that have a buffer shorter than the defined size. + # Note that the buffer is still resized to the defined length + def enforce_short_buffer_allowed(self): + return True + + # If this is true it will enforce that COSMOS DERIVED items must have a + # write_conversion to be written + def enforce_derived_write_conversion(self, _item): + return True diff --git a/openc3/python/openc3/interfaces/protocols/cmd_response_protocol.py b/openc3/python/openc3/interfaces/protocols/cmd_response_protocol.py new file mode 100644 index 0000000000..cb0226fc7f --- /dev/null +++ b/openc3/python/openc3/interfaces/protocols/cmd_response_protocol.py @@ -0,0 +1,113 @@ +# Copyright 2024 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +from openc3.system.system import System +from openc3.config.config_parser import ConfigParser +from openc3.utilities.logger import Logger +from openc3.interfaces.protocols.protocol import Protocol +from queue import SimpleQueue, Empty +import time + + +# Protocol that waits for a response for any commands with a defined response packet. +# The response packet is identified but not defined by the protocol. +class CmdResponseProtocol(Protocol): + # @param response_timeout [Float] Number of seconds to wait before timing out + # when waiting for a response + # @param response_polling_period [Float] Number of seconds to wait between polling + # for a response + # @param raise_exceptions [String] Whether to raise exceptions when errors + # occur in the protocol like unexpected responses or response timeouts. + # @param allow_empty_data [true/false/nil] See Protocol#initialize + def __init__( + self, response_timeout=5.0, response_polling_period=0.02, raise_exceptions=False, allow_empty_data=None + ): + super().__init__(allow_empty_data) + self.response_timeout = ConfigParser.handle_none(response_timeout) + if self.response_timeout: + self.response_timeout = float(response_timeout) + self.response_polling_period = float(response_polling_period) + self.raise_exceptions = ConfigParser.handle_true_false(raise_exceptions) + self.write_block_queue = SimpleQueue() + self.response_packet = None + + def connect_reset(self): + super().connect_reset() + try: + while self.write_block_queue.qsize != 0: + self.write_block_queue.get_nowait() + except Empty: + pass + + def disconnect_reset(self): + super().disconnect_reset() + self.write_block_queue.put(None) # Unblock the write block queue + + def read_packet(self, packet): + if self.response_packet is not None: + # Grab the response packet specified in the command + result_packet = System.telemetry.packet(self.response_packet[0], self.response_packet[1]).clone() + result_packet.buffer = packet.buffer + result_packet.received_time = None + result_packet.stored = packet.stored + result_packet.extra = packet.extra + + # Release the write + self.write_block_queue.put(None) + + # This returns the fully identified and defined packet + # Received time is handled by the interface microservice + return result_packet + else: + return packet + + def write_packet(self, packet): + # Setup the response packet (if there is one) + # This primes waiting for the response in post_write_interface + self.response_packet = packet.response + + return packet + + def post_write_interface(self, packet, data, extra=None): + if self.response_packet is not None: + if self.response_timeout: + response_timeout_time = time.time() + self.response_timeout + else: + response_timeout_time = None + + # Block the write until the response is received + while True: + try: + self.write_block_queue.get_nowait() + break + except Empty: + time.sleep(self.response_polling_period) + if response_timeout_time is None: + continue + if response_timeout_time and time.time() < response_timeout_time: + continue + interface_name = "" + if self.interface is not None: + interface_name = self.interface.name + self.handle_error(f"{interface_name}: Timeout waiting for response") + + self.response_packet = None + return super().post_write_interface(packet, data, extra) + + def handle_error(self, msg): + Logger.error(msg) + if self.raise_exceptions: + raise RuntimeError(msg) diff --git a/openc3/python/test/accessors/test_template_accessor.py b/openc3/python/test/accessors/test_template_accessor.py new file mode 100644 index 0000000000..32af5044f8 --- /dev/null +++ b/openc3/python/test/accessors/test_template_accessor.py @@ -0,0 +1,145 @@ +# Copyright 2024 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +import unittest +from unittest.mock import * +from test.test_helper import * +from openc3.accessors.template_accessor import TemplateAccessor +from openc3.packets.packet import Packet +from collections import namedtuple + + +class TestTemplateAccessor(unittest.TestCase): + def setUp(self): + self.packet = Packet() + self.packet.template = b"MEAS:VOLT (@); SOMETHING ELSE ;" + self.data = b"MEAS:VOLT (@2); SOMETHING ELSE 5.67;" + + self.packet2 = Packet() + self.packet2.template = b"MEAS:VOLT <@(CHANNEL)>; SOMETHING ELSE (MYVALUE);" + self.data2 = b"MEAS:VOLT <@2>; SOMETHING ELSE 5.67;" + + def test_should_escape_regexp_chars(self): + packet = Packet() + packet.template = b"VOLT , ($1); VOLT? (+1)" + data = b"VOLT 5, ($1); VOLT? (+1)" + accessor = TemplateAccessor(packet) + packet.buffer = data + + item1 = namedtuple("Item", ["name", "key", "data_type", "array_size"]) + item1.name = "VOLTAGE" + item1.key = "VOLTAGE" + item1.data_type = "FLOAT" + item1.array_size = None + value = accessor.read_item(item1, packet.buffer) + self.assertEqual(value, 5.0) + + def test_should_allow_different_delimiters(self): + packet = Packet() + packet.template = b"VOLT (VOLTAGE), *1.0; VOLT? *1.0" + data = b"VOLT 5, *1.0; VOLT? *1.0" + accessor = TemplateAccessor(packet, "(", ")") + packet.buffer = data + + item1 = namedtuple("Item", ["name", "key", "data_type", "array_size"]) + item1.name = "VOLTAGE" + item1.key = "VOLTAGE" + item1.data_type = "FLOAT" + item1.array_size = None + value = accessor.read_item(item1, packet.buffer_no_copy()) + self.assertEqual(value, 5.0) + + def test_should_read_values(self): + accessor = TemplateAccessor(self.packet) + self.packet.buffer = self.data + + item1 = namedtuple("Item", ["name", "key", "data_type", "array_size"]) + item1.name = "CHANNEL" + item1.key = "CHANNEL" + item1.data_type = "UINT" + item1.array_size = None + value = accessor.read_item(item1, self.packet.buffer_no_copy()) + self.assertEqual(value, 2) + + item2 = namedtuple("Item", ["name", "key", "data_type", "array_size"]) + item2.name = "MYVALUE" + item2.key = "MYVALUE" + item2.data_type = "FLOAT" + item2.array_size = None + value = accessor.read_item(item2, self.packet.buffer_no_copy()) + self.assertAlmostEqual(value, 5.67, places=2) + + values = accessor.read_items([item1, item2], self.packet.buffer_no_copy()) + self.assertEqual(values["CHANNEL"], 2) + self.assertAlmostEqual(values["MYVALUE"], 5.67, places=2) + + accessor = TemplateAccessor(self.packet2, "(", ")") + self.packet2.buffer = self.data2 + + value = accessor.read_item(item1, self.packet2.buffer_no_copy()) + self.assertEqual(value, 2) + + value = accessor.read_item(item2, self.packet2.buffer_no_copy()) + self.assertAlmostEqual(value, 5.67, places=2) + + values = accessor.read_items([item1, item2], self.packet2.buffer_no_copy()) + self.assertEqual(values["CHANNEL"], 2) + self.assertAlmostEqual(values["MYVALUE"], 5.67, places=2) + + def test_should_write_values(self): + accessor = TemplateAccessor(self.packet) + self.packet.restore_defaults() + + item1 = namedtuple("Item", ["name", "key", "data_type", "array_size"]) + item1.name = "CHANNEL" + item1.key = "CHANNEL" + item1.data_type = "UINT" + item1.array_size = None + value = accessor.write_item(item1, 3, self.packet.buffer_no_copy()) + self.assertEqual(value, 3) + self.assertEqual(self.packet.buffer, b"MEAS:VOLT (@3); SOMETHING ELSE ;") + + item2 = namedtuple("Item", ["name", "key", "data_type", "array_size"]) + item2.name = "MYVALUE" + item2.key = "MYVALUE" + item2.data_type = "FLOAT" + item2.array_size = None + value = accessor.write_item(item2, 1.234, self.packet.buffer_no_copy()) + self.assertAlmostEqual(value, 1.234, places=2) + self.assertEqual(self.packet.buffer, b"MEAS:VOLT (@3); SOMETHING ELSE 1.234;") + + self.packet.restore_defaults() + accessor.write_items([item1, item2], [4, 2.345], self.packet.buffer_no_copy()) + values = accessor.read_items([item1, item2], self.packet.buffer_no_copy()) + self.assertEqual(values["CHANNEL"], 4) + self.assertAlmostEqual(values["MYVALUE"], 2.345, places=2) + + accessor = TemplateAccessor(self.packet2, "(", ")") + self.packet2.restore_defaults() + + value = accessor.write_item(item1, 3, self.packet2.buffer_no_copy()) + self.assertEqual(value, 3) + self.assertEqual(self.packet2.buffer, b"MEAS:VOLT <@3>; SOMETHING ELSE (MYVALUE);") + + value = accessor.write_item(item2, 1.234, self.packet2.buffer_no_copy()) + self.assertAlmostEqual(value, 1.234, places=2) + self.assertEqual(self.packet2.buffer, b"MEAS:VOLT <@3>; SOMETHING ELSE 1.234;") + + self.packet2.restore_defaults() + accessor.write_items([item1, item2], [4, 2.345], self.packet2.buffer_no_copy()) + values = accessor.read_items([item1, item2], self.packet2.buffer_no_copy()) + self.assertEqual(values["CHANNEL"], 4) + self.assertAlmostEqual(values["MYVALUE"], 2.345, places=2) diff --git a/openc3/python/test/interfaces/protocols/test_cmd_response_protocol.py b/openc3/python/test/interfaces/protocols/test_cmd_response_protocol.py new file mode 100644 index 0000000000..3349a22ce6 --- /dev/null +++ b/openc3/python/test/interfaces/protocols/test_cmd_response_protocol.py @@ -0,0 +1,180 @@ +# Copyright 2024 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +import time +import unittest +import threading +import tempfile +from unittest.mock import * +from test.test_helper import * +from openc3.packets.packet import Packet +from openc3.packets.packet_config import PacketConfig +from openc3.packets.telemetry import Telemetry +from openc3.accessors.template_accessor import TemplateAccessor +from openc3.interfaces.stream_interface import StreamInterface +from openc3.interfaces.protocols.cmd_response_protocol import CmdResponseProtocol +from openc3.streams.stream import Stream + + +class TestCmdResponseProtocol(unittest.TestCase): + write_buffer = "" + read_buffer = "" + + class CmdResponseStream(Stream): + + def connect(self): + pass + + def connected(self): + return True + + def disconnect(self): + pass + + def read_nonblock(self): + return [] + + def write(self, buffer): + TestCmdResponseProtocol.write_buffer = buffer + + def read(self): + return TestCmdResponseProtocol.read_buffer + + class MyInterface(StreamInterface): + def connected(self): + return True + + def setUp(self): + self.interface = self.MyInterface() + TestCmdResponseProtocol.write_buffer = "" + TestCmdResponseProtocol.read_buffer = "" + self.read_cnt = 0 + self.read_result = None + + def test_unblocks_writes_waiting_for_responses(self): + self.interface.stream = self.CmdResponseStream() + self.interface.add_protocol(CmdResponseProtocol, [], "READ_WRITE") + packet = Packet("TGT", "CMD") + packet.template = b"SOUR:VOLT" + packet.response = ["TGT", "READ_VOLTAGE"] + packet.restore_defaults() + + # write blocks waiting for the response so spawn a thread + def my_write(): + self.interface.write(packet) + + thread = threading.Thread(target=my_write) + thread.start() + + time.sleep(0.1) + self.interface.disconnect() + time.sleep(0.1) + thread.join() + + def test_works_without_a_response(self): + self.interface.stream = self.CmdResponseStream() + self.interface.add_protocol(CmdResponseProtocol, [], "READ_WRITE") + packet = Packet("TGT", "CMD") + packet.append_item("VOLTAGE", 16, "UINT") + packet.get_item("VOLTAGE").default = 1 + packet.append_item("CHANNEL", 16, "UINT") + packet.get_item("CHANNEL").default = 2 + packet.template = b"SOUR:VOLT , (@)" + packet.accessor = TemplateAccessor(packet) + packet.restore_defaults() + self.interface.write(packet) + self.assertEqual(TestCmdResponseProtocol.write_buffer, b"SOUR:VOLT 1, (@2)") + + def test_logs_an_error_if_it_doesnt_receive_a_response(self): + self.interface.stream = self.CmdResponseStream() + self.interface.add_protocol(CmdResponseProtocol, [1.5, 0.02, True], "READ_WRITE") + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "CMD") + packet.template = b"GO" + packet.response = ["TGT", "DATA"] + packet.restore_defaults() + self.interface.connect() + start = time.time() + with self.assertRaisesRegex(RuntimeError, "Timeout waiting for response"): + self.interface.write(packet) + self.assertAlmostEqual(time.time() - start, 1.5, places=1) + + def test_disconnects_if_it_doesnt_receive_a_response(self): + self.interface.stream = self.CmdResponseStream() + self.interface.add_protocol(CmdResponseProtocol, [1.5, 0.02, True], "READ_WRITE") + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "CMD") + packet.template = b"GO" + packet.response = ["TGT", "DATA"] + packet.restore_defaults + self.interface.connect + start = time.time() + with self.assertRaisesRegex(RuntimeError, "Timeout waiting for response"): + self.interface.write(packet) + self.assertAlmostEqual(time.time() - start, 1.5, places=1) + + def test_doesnt_expect_responses_for_empty_response_fields(self): + self.interface.stream = self.CmdResponseStream() + self.interface.add_protocol(CmdResponseProtocol, [], "READ_WRITE") + self.interface.target_names = ["TGT"] + packet = Packet("TGT", "CMD") + packet.template = b"GO" + packet.restore_defaults + self.interface.connect + self.interface.write(packet) + + @patch("openc3.interfaces.protocols.cmd_response_protocol.System") + def test_processes_responses_with_no_id_fields(self, mock_system): + tf = tempfile.NamedTemporaryFile(mode="w") + tf.write("TELEMETRY TGT READ_VOLTAGE BIG_ENDIAN\n") + tf.write(" ACCESSOR TemplateAccessor\n") + tf.write(' TEMPLATE ""\n') + tf.write(" APPEND_ITEM VOLTAGE 16 UINT\n") + tf.seek(0) + pc = PacketConfig() + pc.process_file(tf.name, "SYSTEM") + tf.close() + + mock_system.telemetry = Telemetry(pc, mock_system) + self.interface.stream = self.CmdResponseStream() + self.interface.add_protocol(CmdResponseProtocol, [1.5, 0.02, True], "READ_WRITE") + # Add extra target names to the interface to ensure we grab the correct one + self.interface.target_names = ["BLAH", "TGT", "OTHER"] + packet = Packet("TGT", "CMD") + packet.accessor = TemplateAccessor(packet) + packet.append_item("VOLTAGE", 16, "UINT") + packet.get_item("VOLTAGE").default = 11 + packet.append_item("CHANNEL", 16, "UINT") + packet.get_item("CHANNEL").default = 1 + packet.template = b"SOUR:VOLT , (@)" + packet.response = ["TGT", "READ_VOLTAGE"] + packet.restore_defaults() + self.interface.connect() + self.read_result = None + TestCmdResponseProtocol.read_buffer = b"\x31\x30" # ASCII 31, 30 is '10' + + # write blocks waiting for the response so spawn a thread + def my_read(): + time.sleep(0.5) + self.read_result = self.interface.read() + + thread = threading.Thread(target=my_read) + thread.start() + + self.interface.write(packet) + time.sleep(0.55) + self.assertEqual(TestCmdResponseProtocol.write_buffer, b"SOUR:VOLT 11, (@1)") + self.assertEqual(self.read_result.read("VOLTAGE"), (10))