Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using full vector from mdp travel service #156

Merged
merged 6 commits into from
Apr 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ This produces a map with 9 nodes: `ChargingPoint` in the centre, with `v_-2`, `v
To start the executive framework, launch the following launch file.

```bash
roslaunch task_executor task-scheduler.launch
roslaunch task_executor task-scheduler-top.launch
```

This launches using topolgical navigation for moving the robot. If you wish to use the MDP execution (which has additional runtime dependencies) replace `top` with `mdp` in the launch file name.

## Running scheduled patrols

Expand Down
52 changes: 52 additions & 0 deletions task_executor/scripts/scheduled_task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from strands_executive_msgs.msg import Task, ExecutionStatus, DurationMatrix, DurationList
from strands_executive_msgs.srv import GetSchedule
from task_executor.sm_base_executor import AbstractTaskExecutor
from task_executor.base_executor import BaseTaskExecutor
from threading import Thread
from task_executor.execution_schedule import ExecutionSchedule
from operator import attrgetter
Expand Down Expand Up @@ -173,6 +174,57 @@ def task_demanded(self, demanded_task, currently_active_task):


def get_duration_matrix(self, tasks):
"""
Creates the matrix of durations between waypoints needed as input to the scheuler.
Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element.
"""
if self.nav_service == BaseTaskExecutor.TOPOLOGICAL_NAV:
return self.get_duration_matrix_top_nav(tasks)
elif self.nav_service == BaseTaskExecutor.MDP_NAV:
return self.get_duration_matrix_mdp(tasks)
else:
raise RuntimeError('Unknown nav service: %s'% self.nav_service)


def get_duration_matrix_mdp(self, tasks):
"""
Creates the matrix of durations between waypoints needed as input to the scheuler.
Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element.
"""

# mdp returns vector of times from ALL START nodes to a SINGLE TARGET for a SINGLE TIME
# therefore we can combine calls from any start node to the same target for the same time


# used to cache reseults of calls to mdp service
travel_durations = dict()

# thing we need to return
dm = DurationMatrix()

for first_task in tasks:
dm.durations.append(DurationList())
for second_task in tasks:
start = first_task.end_node_id
epoch = second_task.start_after
target = second_task.start_node_id

# we need to get the travel_duration for this tuple if we haven't seen the time before or the target for this time before
if epoch not in travel_durations or target not in travel_durations[epoch]:
if epoch not in travel_durations:
travel_durations[epoch] = dict()
# call mdp duration service
resp = self.get_mdp_vector(target, epoch)
travel_durations[epoch][target] = resp
else:
# rospy.loginfo('Saving a call: %s %s %s', start, epoch.secs, target)
resp = travel_durations[epoch][target]

dm.durations[-1].durations.append(resp.travel_times[resp.source_waypoints.index(start)])

return dm

def get_duration_matrix_top_nav(self, tasks):
"""
Creates the matrix of durations between waypoints needed as input to the scheuler.
Output is a DurationMatrix encoding duration[i][j] where this is the duration expected for travelling between the end of the ith task in tasks and the start of the jth element.
Expand Down
41 changes: 29 additions & 12 deletions task_executor/src/task_executor/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ def __init__(self):
rospy.Subscriber('/closest_node', String, self.update_topological_closest_node)
self.active_task = None
self.active_task_completes_by = rospy.get_rostime()
self.logging_lock = threading.Lock()
self.service_lock = threading.Lock()
self.expected_time_lock = threading.Lock()
self.expected_time_lock = threading.RLock()

self.task_event_publisher = rospy.Publisher('task_executor/events', TaskEvent, queue_size=20)

Expand Down Expand Up @@ -155,16 +156,26 @@ def create_expected_time_service(self):
raise RuntimeError('Unknown nav service: %s'% self.nav_service)


def get_mdp_vector(self, target, epoch):
try:
# expected_time_lock is reentrant
self.expected_time_lock.acquire()
self.create_expected_time_service()
return self.expected_time_srv(target_waypoint=target, epoch=epoch)
finally:
self.expected_time_lock.release()

def mdp_expected_time(self, start, end, task = None):


# if task is none, assume immediate execution
if task is None:
if task is None or task.start_after is None:
epoch = rospy.get_rostime()
# else take the epoch from the earliest execution time
else:
epoch = task.start_after

resp = self.expected_time_srv(target_waypoint=end, epoch=epoch)
resp = self.get_mdp_vector(end, epoch)

return resp.travel_times[resp.source_waypoints.index(start)]

Expand Down Expand Up @@ -200,23 +211,29 @@ def get_navigation_duration(self, start, end, task = None):


def log_task_events(self, tasks, event, time, description=""):
for task in tasks:
te = TaskEvent(task=task, event=event, time=time, description=description)

try:
self.task_event_publisher.publish(te)
self.logging_msg_store.insert(te)
except Exception, e:
rospy.logwarn('Caught exception when logging: %s' % e)
try:
self.logging_lock.acquire()
for task in tasks:
te = TaskEvent(task=task, event=event, time=time, description=description)
try:
self.task_event_publisher.publish(te)
self.logging_msg_store.insert(te)
except Exception, e:
rospy.logwarn('Caught exception when logging: %s' % e)
finally:
self.logging_lock.release()


def log_task_event(self, task, event, time, description=""):
te = TaskEvent(task=task, event=event, time=time, description=description)
try:
self.logging_lock.acquire()
te = TaskEvent(task=task, event=event, time=time, description=description)
self.task_event_publisher.publish(te)
self.logging_msg_store.insert(te)
except Exception, e:
rospy.logwarn('Caught exception when logging: %s' % e)
finally:
self.logging_lock.release()


def add_task_ros_srv(self, req):
Expand Down
4 changes: 2 additions & 2 deletions task_executor/src/task_executor/sm_base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ def execute_task(self, task):
else:
raise RuntimeError('Unknown nav service: %s'% self.nav_service)

# let nav run for three times the length it usually takes before terminating
monitor_duration = self.expected_navigation_duration_now(task.start_node_id) * 3
# let nav run for 1.5 times the length it usually takes before terminating
monitor_duration = self.expected_navigation_duration_now(task.start_node_id) * 1.5

smach.Concurrence.add('MONITORED',
SimpleActionState(nav_action_name,
Expand Down