Skip to content

Commit

Permalink
Merge pull request #1684 from OpenC3/python_cmd_response_protocol_and…
Browse files Browse the repository at this point in the history
…_template_accessor

Python TemplateAccessor and CmdResponseProtocol
  • Loading branch information
ryanmelt authored Nov 8, 2024
2 parents 3e99804 + 931f66a commit 74aa4b7
Show file tree
Hide file tree
Showing 4 changed files with 593 additions and 0 deletions.
155 changes: 155 additions & 0 deletions openc3/python/openc3/accessors/template_accessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright 2024 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.accessors.accessor import Accessor
import re


class TemplateAccessor(Accessor):
def __init__(self, packet, left_char="<", right_char=">"):
super().__init__(packet)
self.left_char = left_char
self.right_char = right_char
self.configured = False

def configure(self):
if self.configured:
return

escaped_left_char = self.left_char
if self.left_char == "(":
escaped_left_char = f"\\{self.left_char}"
escaped_right_char = self.right_char
if self.right_char == ")":
escaped_right_char = f"\\{self.right_char}"

# Convert the template into a Regexp for reading each item
template = self.packet.template[:].decode()
template_items = re.compile(f"{escaped_left_char}.*?{escaped_right_char}", re.X).findall(template)
escaped_read_template = re.escape(template)

self.item_keys = []
for item in template_items:
self.item_keys.append(item[1:-1])
# If they're using parens we have to escape them
# since we're working with the already escaped template
if self.left_char == "(":
item = f"\({item[1:]}"
if self.right_char == ")":
item = f"{item[0:-1]}\)"
escaped_read_template = escaped_read_template.replace(item, "(.*)")
self.read_regexp = re.compile(escaped_read_template, re.X)
self.configured = True

def read_item(self, item, buffer):
if item.data_type == "DERIVED":
return None
self.configure()

# Scan the response for all the variables in brackets <VARIABLE>
values = self.read_regexp.match(buffer.decode())
if values is not None:
values = values.groups()
if values is None or (len(values) != len(self.item_keys)):
num_items = 0
if values is not None:
num_items = len(values)
raise RuntimeError(
f"Unexpected number of items found in buffer: {num_items}, Expected: {len(self.item_keys)}"
)
else:
for i, value in enumerate(values):
item_key = self.item_keys[i]
if item_key == item.key:
return Accessor.convert_to_type(value, item)

raise RuntimeError(f"Response does not include key {item.key}: {buffer}")

def read_items(self, items, buffer):
result = {}
self.configure()

# Scan the response for all the variables in brackets <VARIABLE>
values = self.read_regexp.match(buffer.decode())
if values is not None:
values = values.groups()
if values is None or (len(values) != len(self.item_keys)):
num_items = 0
if values is not None:
num_items = len(values)
raise RuntimeError(
f"Unexpected number of items found in buffer: {num_items}, Expected: {len(self.item_keys)}"
)
else:
for item in items:
if item.data_type == "DERIVED":
result[item.name] = None
continue
try:
index = self.item_keys.index(item.key)
result[item.name] = Accessor.convert_to_type(values[index], item)
except ValueError:
raise RuntimeError(f"Unknown item with key {item.key} requested")

return result

def write_item(self, item, value, buffer):
if item.data_type == "DERIVED":
return None
self.configure()

updated_buffer = buffer.decode().replace(f"{self.left_char}{item.key}{self.right_char}", str(value)).encode()

if buffer == updated_buffer:
raise RuntimeError(f"Key {item.key} not found in template")
buffer[0:] = updated_buffer
return value

def write_items(self, items, values, buffer):
self.configure()
for index, item in enumerate(items):
if item.data_type == "DERIVED":
continue
updated_buffer = (
buffer.decode().replace(f"{self.left_char}{item.key}{self.right_char}", str(values[index])).encode()
)

if buffer == updated_buffer:
raise RuntimeError(f"Key {item.key} not found in template")
buffer[0:] = updated_buffer
return values

# If this is set it will enforce that buffer data is encoded
# in a specific encoding
def enforce_encoding(self):
return None

# This affects whether the Packet class enforces the buffer
# length at all. Set to false to remove any correlation between
# buffer length and defined sizes of items in COSMOS
def enforce_length(self):
return False

# This sets the short_buffer_allowed flag in the Packet class
# which allows packets that have a buffer shorter than the defined size.
# Note that the buffer is still resized to the defined length
def enforce_short_buffer_allowed(self):
return True

# If this is true it will enforce that COSMOS DERIVED items must have a
# write_conversion to be written
def enforce_derived_write_conversion(self, _item):
return True
113 changes: 113 additions & 0 deletions openc3/python/openc3/interfaces/protocols/cmd_response_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2024 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.system.system import System
from openc3.config.config_parser import ConfigParser
from openc3.utilities.logger import Logger
from openc3.interfaces.protocols.protocol import Protocol
from queue import SimpleQueue, Empty
import time


# Protocol that waits for a response for any commands with a defined response packet.
# The response packet is identified but not defined by the protocol.
class CmdResponseProtocol(Protocol):
# @param response_timeout [Float] Number of seconds to wait before timing out
# when waiting for a response
# @param response_polling_period [Float] Number of seconds to wait between polling
# for a response
# @param raise_exceptions [String] Whether to raise exceptions when errors
# occur in the protocol like unexpected responses or response timeouts.
# @param allow_empty_data [true/false/nil] See Protocol#initialize
def __init__(
self, response_timeout=5.0, response_polling_period=0.02, raise_exceptions=False, allow_empty_data=None
):
super().__init__(allow_empty_data)
self.response_timeout = ConfigParser.handle_none(response_timeout)
if self.response_timeout:
self.response_timeout = float(response_timeout)
self.response_polling_period = float(response_polling_period)
self.raise_exceptions = ConfigParser.handle_true_false(raise_exceptions)
self.write_block_queue = SimpleQueue()
self.response_packet = None

def connect_reset(self):
super().connect_reset()
try:
while self.write_block_queue.qsize != 0:
self.write_block_queue.get_nowait()
except Empty:
pass

def disconnect_reset(self):
super().disconnect_reset()
self.write_block_queue.put(None) # Unblock the write block queue

def read_packet(self, packet):
if self.response_packet is not None:
# Grab the response packet specified in the command
result_packet = System.telemetry.packet(self.response_packet[0], self.response_packet[1]).clone()
result_packet.buffer = packet.buffer
result_packet.received_time = None
result_packet.stored = packet.stored
result_packet.extra = packet.extra

# Release the write
self.write_block_queue.put(None)

# This returns the fully identified and defined packet
# Received time is handled by the interface microservice
return result_packet
else:
return packet

def write_packet(self, packet):
# Setup the response packet (if there is one)
# This primes waiting for the response in post_write_interface
self.response_packet = packet.response

return packet

def post_write_interface(self, packet, data, extra=None):
if self.response_packet is not None:
if self.response_timeout:
response_timeout_time = time.time() + self.response_timeout
else:
response_timeout_time = None

# Block the write until the response is received
while True:
try:
self.write_block_queue.get_nowait()
break
except Empty:
time.sleep(self.response_polling_period)
if response_timeout_time is None:
continue
if response_timeout_time and time.time() < response_timeout_time:
continue
interface_name = ""
if self.interface is not None:
interface_name = self.interface.name
self.handle_error(f"{interface_name}: Timeout waiting for response")

self.response_packet = None
return super().post_write_interface(packet, data, extra)

def handle_error(self, msg):
Logger.error(msg)
if self.raise_exceptions:
raise RuntimeError(msg)
Loading

0 comments on commit 74aa4b7

Please sign in to comment.