diff --git a/README.md b/README.md index aea2b10..f058873 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,10 @@ This produces a map with 9 nodes: `ChargingPoint` in the centre, with `v_-2`, `v To start the executive framework, launch the following launch file. ```bash -roslaunch task_executor task-scheduler.launch +roslaunch task_executor task-scheduler-top.launch ``` +This launches using topolgical navigation for moving the robot. If you wish to use the MDP execution (which has additional runtime dependencies) replace `top` with `mdp` in the launch file name. ## Running scheduled patrols diff --git a/task_executor/scripts/scheduled_task_executor.py b/task_executor/scripts/scheduled_task_executor.py index 4635719..51e9ce4 100755 --- a/task_executor/scripts/scheduled_task_executor.py +++ b/task_executor/scripts/scheduled_task_executor.py @@ -5,6 +5,7 @@ from strands_executive_msgs.msg import Task, ExecutionStatus, DurationMatrix, DurationList from strands_executive_msgs.srv import GetSchedule from task_executor.sm_base_executor import AbstractTaskExecutor +from task_executor.base_executor import BaseTaskExecutor from threading import Thread from task_executor.execution_schedule import ExecutionSchedule from operator import attrgetter @@ -173,6 +174,57 @@ def task_demanded(self, demanded_task, currently_active_task): def get_duration_matrix(self, tasks): + """ + Creates the matrix of durations between waypoints needed as input to the scheuler. + Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element. + """ + if self.nav_service == BaseTaskExecutor.TOPOLOGICAL_NAV: + return self.get_duration_matrix_top_nav(tasks) + elif self.nav_service == BaseTaskExecutor.MDP_NAV: + return self.get_duration_matrix_mdp(tasks) + else: + raise RuntimeError('Unknown nav service: %s'% self.nav_service) + + + def get_duration_matrix_mdp(self, tasks): + """ + Creates the matrix of durations between waypoints needed as input to the scheuler. + Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element. + """ + + # mdp returns vector of times from ALL START nodes to a SINGLE TARGET for a SINGLE TIME + # therefore we can combine calls from any start node to the same target for the same time + + + # used to cache reseults of calls to mdp service + travel_durations = dict() + + # thing we need to return + dm = DurationMatrix() + + for first_task in tasks: + dm.durations.append(DurationList()) + for second_task in tasks: + start = first_task.end_node_id + epoch = second_task.start_after + target = second_task.start_node_id + + # we need to get the travel_duration for this tuple if we haven't seen the time before or the target for this time before + if epoch not in travel_durations or target not in travel_durations[epoch]: + if epoch not in travel_durations: + travel_durations[epoch] = dict() + # call mdp duration service + resp = self.get_mdp_vector(target, epoch) + travel_durations[epoch][target] = resp + else: + # rospy.loginfo('Saving a call: %s %s %s', start, epoch.secs, target) + resp = travel_durations[epoch][target] + + dm.durations[-1].durations.append(resp.travel_times[resp.source_waypoints.index(start)]) + + return dm + + def get_duration_matrix_top_nav(self, tasks): """ Creates the matrix of durations between waypoints needed as input to the scheuler. Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element. diff --git a/task_executor/src/task_executor/base_executor.py b/task_executor/src/task_executor/base_executor.py index f1b7420..95da7c6 100755 --- a/task_executor/src/task_executor/base_executor.py +++ b/task_executor/src/task_executor/base_executor.py @@ -86,8 +86,9 @@ def __init__(self): rospy.Subscriber('/closest_node', String, self.update_topological_closest_node) self.active_task = None self.active_task_completes_by = rospy.get_rostime() + self.logging_lock = threading.Lock() self.service_lock = threading.Lock() - self.expected_time_lock = threading.Lock() + self.expected_time_lock = threading.RLock() self.task_event_publisher = rospy.Publisher('task_executor/events', TaskEvent, queue_size=20) @@ -155,16 +156,26 @@ def create_expected_time_service(self): raise RuntimeError('Unknown nav service: %s'% self.nav_service) + def get_mdp_vector(self, target, epoch): + try: + # expected_time_lock is reentrant + self.expected_time_lock.acquire() + self.create_expected_time_service() + return self.expected_time_srv(target_waypoint=target, epoch=epoch) + finally: + self.expected_time_lock.release() + def mdp_expected_time(self, start, end, task = None): + # if task is none, assume immediate execution - if task is None: + if task is None or task.start_after is None: epoch = rospy.get_rostime() # else take the epoch from the earliest execution time else: epoch = task.start_after - resp = self.expected_time_srv(target_waypoint=end, epoch=epoch) + resp = self.get_mdp_vector(end, epoch) return resp.travel_times[resp.source_waypoints.index(start)] @@ -200,23 +211,29 @@ def get_navigation_duration(self, start, end, task = None): def log_task_events(self, tasks, event, time, description=""): - for task in tasks: - te = TaskEvent(task=task, event=event, time=time, description=description) - - try: - self.task_event_publisher.publish(te) - self.logging_msg_store.insert(te) - except Exception, e: - rospy.logwarn('Caught exception when logging: %s' % e) + try: + self.logging_lock.acquire() + for task in tasks: + te = TaskEvent(task=task, event=event, time=time, description=description) + try: + self.task_event_publisher.publish(te) + self.logging_msg_store.insert(te) + except Exception, e: + rospy.logwarn('Caught exception when logging: %s' % e) + finally: + self.logging_lock.release() def log_task_event(self, task, event, time, description=""): - te = TaskEvent(task=task, event=event, time=time, description=description) try: + self.logging_lock.acquire() + te = TaskEvent(task=task, event=event, time=time, description=description) self.task_event_publisher.publish(te) self.logging_msg_store.insert(te) except Exception, e: rospy.logwarn('Caught exception when logging: %s' % e) + finally: + self.logging_lock.release() def add_task_ros_srv(self, req): diff --git a/task_executor/src/task_executor/sm_base_executor.py b/task_executor/src/task_executor/sm_base_executor.py index 3bdb0a2..6ec6862 100644 --- a/task_executor/src/task_executor/sm_base_executor.py +++ b/task_executor/src/task_executor/sm_base_executor.py @@ -268,8 +268,8 @@ def execute_task(self, task): else: raise RuntimeError('Unknown nav service: %s'% self.nav_service) - # let nav run for three times the length it usually takes before terminating - monitor_duration = self.expected_navigation_duration_now(task.start_node_id) * 3 + # let nav run for 1.5 times the length it usually takes before terminating + monitor_duration = self.expected_navigation_duration_now(task.start_node_id) * 1.5 smach.Concurrence.add('MONITORED', SimpleActionState(nav_action_name,