Skip to content

Commit

Permalink
Update python tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jmthomas committed Feb 12, 2025
1 parent 449cc59 commit e80e41f
Show file tree
Hide file tree
Showing 25 changed files with 271 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def solar_panel_thread_method(self):
self.solar_panel_thread_cancel = False
break

def graceful_kill(self):
def graceful_kill(self, timeout):
self.solar_panel_thread_cancel = True

def read(self, count_100hz, time):
Expand Down
2 changes: 1 addition & 1 deletion openc3-cosmos-script-runner-api/scripts/running_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ def as_json(self):

# Private methods

def graceful_kill(self):
def graceful_kill(self, timeout):
self.stop = True

def initialize_variables(self):
Expand Down
3 changes: 1 addition & 2 deletions openc3/lib/openc3/topics/decom_interface_topic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

module OpenC3
class DecomInterfaceTopic < Topic
def self.build_cmd(target_name, cmd_name, cmd_params, range_check, raw, scope:)
def self.build_cmd(target_name, cmd_name, cmd_params, range_check, raw, timeout: 5, scope:)
data = {}
data['target_name'] = target_name.to_s.upcase
data['cmd_name'] = cmd_name.to_s.upcase
Expand All @@ -34,7 +34,6 @@ def self.build_cmd(target_name, cmd_name, cmd_params, range_check, raw, scope:)
Topic.update_topic_offsets([ack_topic])
decom_id = Topic.write_topic("#{scope}__DECOMINTERFACE__{#{target_name}}",
{ 'build_cmd' => JSON.generate(data, allow_nan: true) }, '*', 100)
timeout = 5 # Arbitrary 5s timeout
time = Time.now
while (Time.now - time) < timeout
Topic.read_topics([ack_topic]) do |_topic, _msg_id, msg_hash, _redis|
Expand Down
6 changes: 3 additions & 3 deletions openc3/python/openc3/api/cmd_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -163,7 +163,7 @@ def cmd_raw_no_checks(*args, **kwargs):


# Build a command binary
def build_cmd(*args, range_check=True, raw=False, scope=OPENC3_SCOPE, manual=False):
def build_cmd(*args, range_check=True, raw=False, timeout=5, scope=OPENC3_SCOPE, manual=False):
match len(args):
case 1:
target_name, cmd_name, cmd_params = extract_fields_from_cmd_text(args[0])
Expand All @@ -181,7 +181,7 @@ def build_cmd(*args, range_check=True, raw=False, scope=OPENC3_SCOPE, manual=Fal
cmd_name = cmd_name.upper()
cmd_params = {k.upper(): v for k, v in cmd_params.items()}
authorize(permission="cmd_info", target_name=target_name, scope=scope, manual=manual)
return DecomInterfaceTopic.build_cmd(target_name, cmd_name, cmd_params, range_check, raw, scope)
return DecomInterfaceTopic.build_cmd(target_name, cmd_name, cmd_params, range_check, raw, timeout, scope)


# build_command is DEPRECATED
Expand Down
4 changes: 2 additions & 2 deletions openc3/python/openc3/interfaces/tcpip_server_interface.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -215,7 +215,7 @@ def disconnect(self):
super().disconnect()

# Gracefully kill all the threads
def graceful_kill(self):
def graceful_kill(self, timeout):
# This method is just here to prevent warnings
pass

Expand Down
24 changes: 9 additions & 15 deletions openc3/python/openc3/logs/log_writer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -125,15 +125,13 @@ def start(self):

# Stops all logging and closes the current log file.
def stop(self):
threads = None
with self.mutex:
threads = self.close_file(False)
self.close_file(False)
self.logging_enabled = False
return threads

# Stop all logging, close the current log file, and kill the logging threads.
def shutdown(self):
threads = self.stop()
self.stop()
with LogWriter.mutex:
LogWriter.instances.remove(self)
if len(LogWriter.instances) <= 0:
Expand All @@ -142,13 +140,9 @@ def shutdown(self):
if LogWriter.cycle_thread:
kill_thread(self, LogWriter.cycle_thread)
LogWriter.cycle_thread = None
# Wait for BucketUtilities to finish move_log_file_to_bucket_thread
for thread in threads:
thread.join()
self.tmp_dir.cleanup()
return threads

def graceful_kill(self):
def graceful_kill(self, timeout):
self.cancel_threads = True

# implementation details
Expand Down Expand Up @@ -277,7 +271,7 @@ def prepare_write(
if allow_new_file:
self.start_new_file()
elif self.cycle_size and ((self.file_size + data_length) > self.cycle_size):
Logger.debug("Log writer start new file due to cycle size {self.cycle_size}")
Logger.debug(f"Log writer start new file due to cycle size {self.cycle_size}")
if allow_new_file:
self.start_new_file()
elif (
Expand All @@ -289,7 +283,7 @@ def prepare_write(
# Changed to just a error to prevent file thrashing
if not self.out_of_order:
Logger.error(
"Log writer out of order time detected (increase buffer depth?): {Time.from_nsec_from_epoch(self.previous_time_nsec_since_epoch)} {Time.from_nsec_from_epoch(time_nsec_since_epoch)}"
f"Log writer out of order time detected (increase buffer depth?): {from_nsec_from_epoch(self.previous_time_nsec_since_epoch)} {from_nsec_from_epoch(time_nsec_since_epoch)}"
)
self.out_of_order = True
# This is needed for the redis offset marker entry at the end of the log file
Expand All @@ -301,7 +295,7 @@ def prepare_write(
# to keep a full file's worth of data in the stream. This is what prevents continuous stream growth.
# Returns thread that moves log to bucket
def close_file(self, take_mutex=True):
threads = []
thread = None
if take_mutex:
self.mutex.acquire()
try:
Expand All @@ -313,7 +307,8 @@ def close_file(self, take_mutex=True):
# Cleanup timestamps here so they are unset for the next file
self.first_time = None
self.last_time = None
threads.append(BucketUtilities.move_log_file_to_bucket(self.filename, bucket_key))
thread = BucketUtilities.move_log_file_to_bucket(self.filename, bucket_key)
thread.join()
# Now that the file is in storage, trim the Redis stream after a delay
self.cleanup_offsets.append({})
for redis_topic, last_offset in self.last_offsets:
Expand All @@ -328,7 +323,6 @@ def close_file(self, take_mutex=True):
finally:
if take_mutex:
self.mutex.release()
return threads

def bucket_filename(self):
return f"{self.first_timestamp()}__{self.last_timestamp()}" + self.extension()
Expand Down
20 changes: 10 additions & 10 deletions openc3/python/openc3/microservices/interface_microservice.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -81,11 +81,11 @@ def start(self):
self.thread.start()
return self.thread

def stop(self):
kill_thread(self, self.thread)
def stop(self, timeout=1.0):
kill_thread(self, self.thread, timeout=timeout)

def graceful_kill(self):
InterfaceTopic.shutdown(self.interface, scope=self.scope)
def graceful_kill(self, timeout=1.0):
InterfaceTopic.shutdown(self.interface, timeout=timeout, scope=self.scope)
time.sleep(0.001) # Allow other threads to run

def run(self):
Expand Down Expand Up @@ -356,11 +356,11 @@ def start(self):
self.thread.start()
return self.thread

def stop(self):
kill_thread(self, self.thread)
def stop(self, timeout=1.0):
kill_thread(self, self.thread, timeout=timeout)

def graceful_kill(self):
RouterTopic.shutdown(self.router, scope=self.scope)
def graceful_kill(self, timeout=1.0):
RouterTopic.shutdown(self.router, timeout=timeout, scope=self.scope)
time.sleep(0.001) # Allow other threads to run

def run(self):
Expand Down Expand Up @@ -815,7 +815,7 @@ def shutdown(self, sig=None):
self.stop()
super().shutdown()

def graceful_kill(self):
def graceful_kill(self, timeout=1.0):
pass # Just to avoid warning


Expand Down
14 changes: 5 additions & 9 deletions openc3/python/openc3/top_level.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -93,17 +93,13 @@ def set_working_dir(working_dir):
# Attempt to gracefully kill a thread
# @param owner Object that owns the thread and may have a graceful_kill method
# @param thread The thread to gracefully kill
# @param graceful_timeout Timeout in seconds to wait for it to die gracefully
# @param timeout_interval How often to poll for aliveness
# @param hard_timeout Timeout in seconds to wait for it to die ungracefully
def kill_thread(owner, thread, graceful_timeout=1, timeout_interval=0.01, hard_timeout=1):
# @param timeout Timeout in seconds to wait for it to die
def kill_thread(owner, thread, timeout=1.0):
if thread:
if owner and hasattr(owner, "graceful_kill"):
if threading.current_thread() != thread:
owner.graceful_kill()
end_time = time.time() + graceful_timeout
while thread.is_alive() and ((end_time - time.time()) > 0):
time.sleep(timeout_interval)
owner.graceful_kill(timeout=timeout)
thread.join(timeout=timeout)
else:
Logger.warn("Threads cannot graceful_kill themselves")
elif owner:
Expand Down
5 changes: 2 additions & 3 deletions openc3/python/openc3/topics/decom_interface_topic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand All @@ -23,7 +23,7 @@

class DecomInterfaceTopic(Topic):
@classmethod
def build_cmd(cls, target_name, cmd_name, cmd_params, range_check, raw, scope=OPENC3_SCOPE):
def build_cmd(cls, target_name, cmd_name, cmd_params, range_check, raw, timeout=5, scope=OPENC3_SCOPE):
data = {}
data["target_name"] = target_name.upper()
data["cmd_name"] = cmd_name.upper()
Expand All @@ -41,7 +41,6 @@ def build_cmd(cls, target_name, cmd_name, cmd_params, range_check, raw, scope=OP
"*",
100,
)
timeout = 5 # Arbitrary 5s timeout
start_time = time.time()
while (time.time() - start_time) < timeout:
for _topic, _msg_id, msg_hash, _redis in Topic.read_topics([ack_topic]):
Expand Down
4 changes: 2 additions & 2 deletions openc3/python/openc3/topics/interface_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ def stop_raw_logging(cls, interface_name, scope=OPENC3_SCOPE):
)

@classmethod
def shutdown(cls, interface, scope=OPENC3_SCOPE):
def shutdown(cls, interface, timeout=1.0, scope=OPENC3_SCOPE):
InterfaceTopic.while_receive_commands = False
Topic.write_topic(
f"{{{scope}__CMD}}INTERFACE__{interface.name}",
{"shutdown": "true"},
"*",
100,
)
time.sleep(1) # Give some time for the interface to shutdown
time.sleep(timeout) # Give some time for the interface to shutdown
InterfaceTopic.clear_topics(InterfaceTopic.topics(interface, scope=scope))

@classmethod
Expand Down
1 change: 0 additions & 1 deletion openc3/python/openc3/utilities/aws_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
s3_endpoint_url = f"https://s3.{AWS_REGION}.amazonaws.com"
s3_session = boto3.session.Session(region_name=AWS_REGION)


class AwsBucket(Bucket):
CREATE_CHECK_COUNT = 100 # 10 seconds

Expand Down
19 changes: 8 additions & 11 deletions openc3/python/openc3/utilities/bucket_utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand All @@ -14,17 +14,13 @@
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.utilities.bucket import Bucket

# from openc3.utilities.target_file import TargetFile
from openc3.utilities.logger import Logger
from openc3.models.reducer_model import ReducerModel
from openc3.environment import OPENC3_LOGS_BUCKET
import zlib
import os
import zlib
import time
import threading

from openc3.utilities.bucket import Bucket
from openc3.utilities.logger import Logger
from openc3.models.reducer_model import ReducerModel

class BucketUtilities:
FILE_TIMESTAMP_FORMAT = "%Y%m%d%H%M%S%N"
Expand Down Expand Up @@ -66,6 +62,7 @@ def move_log_file_to_bucket_thread(cls, filename, bucket_key, metadata={}):
filename = cls.compress_file(filename)
bucket_key += ".gz"

bucket_name = os.environ.get('OPENC3_LOGS_BUCKET')
retry_count = 0
while retry_count < 3:
try:
Expand All @@ -74,7 +71,7 @@ def move_log_file_to_bucket_thread(cls, filename, bucket_key, metadata={}):
# to be held in memory!
with open(filename, "rb") as file:
client.put_object(
bucket=OPENC3_LOGS_BUCKET,
bucket=bucket_name,
key=bucket_key,
body=file,
metadata=metadata,
Expand All @@ -88,7 +85,7 @@ def move_log_file_to_bucket_thread(cls, filename, bucket_key, metadata={}):
Logger.warn(f"Error saving log file to bucket - retry {retry_count}: {filename}\n{str(err)}")
time.sleep(1)

Logger.debug(f"wrote {OPENC3_LOGS_BUCKET}/{bucket_key}")
Logger.debug(f"wrote {bucket_name}/{bucket_key}")
ReducerModel.add_file(bucket_key) # Record the new file for data reduction

if orig_filename:
Expand Down
4 changes: 2 additions & 2 deletions openc3/python/openc3/utilities/metric.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -112,7 +112,7 @@ def shutdown(self):
kill_thread(self, Metric.update_thread)
Metric.update_thread = None

def graceful_kill(self):
def graceful_kill(self, timeout):
pass

@classmethod
Expand Down
16 changes: 6 additions & 10 deletions openc3/python/openc3/utilities/store_queued.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand Down Expand Up @@ -27,17 +27,13 @@
# Attempt to gracefully kill a thread
# @param owner Object that owns the thread and may have a graceful_kill method
# @param thread The thread to gracefully kill
# @param graceful_timeout Timeout in seconds to wait for it to die gracefully
# @param timeout_interval How often to poll for aliveness
# @param hard_timeout Timeout in seconds to wait for it to die ungracefully
def kill_thread(owner, thread, graceful_timeout=1, timeout_interval=0.01, hard_timeout=1):
# @param timeout Timeout in seconds to wait for it to die gracefully
def kill_thread(owner, thread, timeout=1.0):
if thread:
if owner and hasattr(owner, "graceful_kill"):
if threading.current_thread() != thread:
owner.graceful_kill()
end_time = time.time() + graceful_timeout
while thread.is_alive() and ((end_time - time.time()) > 0):
time.sleep(timeout_interval)
owner.graceful_kill(timeout=timeout)
thread.join(timeout=timeout)


class StoreQueued(metaclass=StoreMeta):
Expand Down Expand Up @@ -119,7 +115,7 @@ def method(*args, **kwargs):
def store_instance(self):
return Store.instance()

def graceful_kill(self):
def graceful_kill(self, timeout):
# Do nothing
pass

Expand Down
Loading

0 comments on commit e80e41f

Please sign in to comment.