Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle task scheduling in ResumableSimulationDriver #1262

Merged
merged 2 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.time.Instant;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

public class ResumableSimulationDriver<Model> implements AutoCloseable {
Expand All @@ -52,6 +54,9 @@ public class ResumableSimulationDriver<Model> implements AutoCloseable {
//mapping each activity name to its task id (in String form) in the simulation engine
private final Map<ActivityDirectiveId, TaskId> plannedDirectiveToTask;

//subset of plannedDirectiveToTask to check for scheduling dependent tasks
private final Map<ActivityDirectiveId, TaskId> toCheckForDependencyScheduling;

//simulation results so far
private SimulationResults lastSimResults;
//cached simulation results cover the period [Duration.ZERO, lastSimResultsEnd]
Expand All @@ -71,6 +76,7 @@ public ResumableSimulationDriver(
){
this.missionModel = missionModel;
plannedDirectiveToTask = new HashMap<>();
toCheckForDependencyScheduling = new HashMap<>();
this.planDuration = planDuration;
countSimulationRestarts = 0;
this.canceledListener = canceledListener;
Expand All @@ -94,6 +100,7 @@ private void printTimeSpent(){
printTimeSpent();
durationSinceRestart = 0;
plannedDirectiveToTask.clear();
toCheckForDependencyScheduling.clear();
lastSimResults = null;
lastSimResultsEnd = Duration.ZERO;
long before = System.nanoTime();
Expand Down Expand Up @@ -273,13 +280,14 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
schedule).compute();
// Filter out activities that are before the plan start
resolved = StartOffsetReducer.filterOutNegativeStartOffset(resolved);

final var toSchedule = new HashSet<ActivityDirectiveId>();
toSchedule.add(null);
scheduleActivities(
toSchedule,
schedule,
resolved,
missionModel,
engine,
activityTopic
engine
);

var allTaskFinished = false;
Expand All @@ -303,6 +311,8 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
final var commit = engine.performJobs(batch.jobs(), cells, curTime, Duration.MAX_VALUE);
timeline.add(commit);

scheduleActivities(getSuccessorsToSchedule(engine), schedule, resolved, missionModel, engine);

// all tasks are complete : do not exit yet, there might be event triggered at the same time
if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask
.values()
Expand All @@ -328,93 +338,65 @@ private void simulateSchedule(final Map<ActivityDirectiveId, ActivityDirective>
* @return its duration if the activity has been simulated and has finished simulating, an IllegalArgumentException otherwise
*/
public Optional<Duration> getActivityDuration(ActivityDirectiveId activityDirectiveId){
//potential cause of non presence: (1) activity is outside plan bounds (2) activity has not been simulated yet
if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty();
return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId));
}

private Set<ActivityDirectiveId> getSuccessorsToSchedule(final SimulationEngine engine) {
final var toSchedule = new HashSet<ActivityDirectiveId>();
final var iterator = toCheckForDependencyScheduling.entrySet().iterator();
while(iterator.hasNext()){
final var taskToCheck = iterator.next();
if(engine.isTaskComplete(taskToCheck.getValue())){
toSchedule.add(taskToCheck.getKey());
iterator.remove();
}
}
return toSchedule;
}

private void scheduleActivities(
final Map<ActivityDirectiveId, ActivityDirective> schedule,
final Set<ActivityDirectiveId> toScheduleNow,
final Map<ActivityDirectiveId, ActivityDirective> completeSchedule,
final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved,
final MissionModel<Model> missionModel,
final SimulationEngine engine,
final Topic<ActivityDirectiveId> activityTopic
)
{
if(resolved.get(null) == null) { return; } // Nothing to simulate

for (final Pair<ActivityDirectiveId, Duration> directivePair : resolved.get(null)) {
final var directiveId = directivePair.getLeft();
final var startOffset = directivePair.getRight();
final var serializedDirective = schedule.get(directiveId).serializedActivity();

final TaskFactory<?> task;
try {
task = missionModel.getTaskFactory(serializedDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDirective.getTypeName(), ex.toString()));
final SimulationEngine engine){
for(final var predecessor: toScheduleNow) {
for (final var directivePair : resolved.get(predecessor)) {
final var offset = directivePair.getRight();
final var directiveIdToSchedule = directivePair.getLeft();
final var serializedDirective = completeSchedule.get(directiveIdToSchedule).serializedActivity();
final TaskFactory<?> task;
try {
task = missionModel.getTaskFactory(serializedDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDirective.getTypeName(), ex.toString()));
}
Duration computedStartTime = offset;
if (predecessor != null) {
computedStartTime = (curTime.isEqualTo(Duration.MIN_VALUE) ? Duration.ZERO : curTime).plus(offset);
}
final var taskId = engine.scheduleTask(
computedStartTime,
makeTaskFactory(directiveIdToSchedule, task, activityTopic));
plannedDirectiveToTask.put(directiveIdToSchedule, taskId);
if (resolved.containsKey(directiveIdToSchedule)) {
toCheckForDependencyScheduling.put(directiveIdToSchedule, taskId);
}
}

final var taskId = engine.scheduleTask(startOffset, makeTaskFactory(
directiveId,
task,
schedule,
resolved,
missionModel,
activityTopic
));
plannedDirectiveToTask.put(directiveId,taskId);
}
}

private static <Model, Output> TaskFactory<Unit> makeTaskFactory(
private static <Output> TaskFactory<Output> makeTaskFactory(
final ActivityDirectiveId directiveId,
final TaskFactory<Output> task,
final Map<ActivityDirectiveId, ActivityDirective> schedule,
final HashMap<ActivityDirectiveId, List<Pair<ActivityDirectiveId, Duration>>> resolved,
final MissionModel<Model> missionModel,
final Topic<ActivityDirectiveId> activityTopic
)
{
// Emit the current activity (defined by directiveId)
return executor -> scheduler0 -> TaskStatus.calling((TaskFactory<Output>) (executor1 -> scheduler1 -> {
scheduler1.emit(directiveId, activityTopic);
return task.create(executor1).step(scheduler1);
}), scheduler2 -> {
// When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time
final List<Pair<ActivityDirectiveId, Duration>> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId);
// Iterate over the dependents
for (final var dependent : dependents) {
scheduler2.spawn(executor2 -> scheduler3 ->
// Delay until the dependent starts
TaskStatus.delayed(dependent.getRight(), scheduler4 -> {
final var dependentDirectiveId = dependent.getLeft();
final var serializedDependentDirective = schedule.get(dependentDirectiveId).serializedActivity();

// Initialize the Task for the dependent
final TaskFactory<?> dependantTask;
try {
dependantTask = missionModel.getTaskFactory(serializedDependentDirective);
} catch (final InstantiationException ex) {
// All activity instantiations are assumed to be validated by this point
throw new Error("Unexpected state: activity instantiation %s failed with: %s"
.formatted(serializedDependentDirective.getTypeName(), ex.toString()));
}

// Schedule the dependent
// When it finishes, it will schedule the activities depending on it to know their start time
scheduler4.spawn(makeTaskFactory(
dependentDirectiveId,
dependantTask,
schedule,
resolved,
missionModel,
activityTopic
));
return TaskStatus.completed(Unit.UNIT);
}));
}
return TaskStatus.completed(Unit.UNIT);
});
final Topic<ActivityDirectiveId> activityTopic) {
return executor -> scheduler -> {
scheduler.emit(directiveId, activityTopic);
return task.create(executor).step(scheduler);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ public Duration valueAt(Duration start, final EquationSolvingAlgorithms.History<
if(computedDuration.isPresent()) {
history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, start.plus(computedDuration.get())), new ActivityMetadata(actToSim));
} else{
logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity.");
logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity or activity outside plan bounds.");
history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, null), new ActivityMetadata(actToSim));
}
} catch (SimulationFacade.SimulationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,21 @@ public void activitiesAnchoredToOtherActivities() throws SchedulingInterruptedEx
assertEquals(1, driver.getCountSimulationRestarts());
}

@Test
@DisplayName("Reference to anchored activities are correctly maintained by the driver")
public void activitiesAnchoredToOtherActivitiesSimple() throws SchedulingInterruptedException {
final var activitiesToSimulate = new HashMap<ActivityDirectiveId, ActivityDirective>(2);
activitiesToSimulate.put(
new ActivityDirectiveId(0),
new ActivityDirective(oneMinute, serializedDelayDirective, null, true));
activitiesToSimulate.put(
new ActivityDirectiveId(1),
new ActivityDirective(oneMinute, serializedDelayDirective, new ActivityDirectiveId(0), false));
driver.simulateActivities(activitiesToSimulate);
final var durationOfAnchoredActivity = driver.getActivityDuration(new ActivityDirectiveId(1));
assertTrue(durationOfAnchoredActivity.isPresent());
}

@Test
@DisplayName("Decomposition and anchors do not interfere with each other")
public void decomposingActivitiesAndAnchors() throws SchedulingInterruptedException{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2695,15 +2695,14 @@ void testRelativeActivityPlanPositiveStartOffsetStart() {
new ActivityDirectiveId(2L),
new ActivityDirective(
tenMinutes,
"GrowBanana",
"PickBanana",
Map.of(
"quantity", SerializedValue.of(1),
"growingDuration", SerializedValue.of(activityDuration.in(Duration.MICROSECONDS))),
"quantity", SerializedValue.of(1)),
new ActivityDirectiveId(1L),
true)),
List.of(new SchedulingGoal(new GoalId(0L), """
export default () => Goal.CoexistenceGoal({
forEach: ActivityExpression.ofType(ActivityTypes.GrowBanana),
forEach: ActivityExpression.ofType(ActivityTypes.PickBanana),
activityTemplate: ActivityTemplates.PeelBanana({peelDirection: "fromStem"}),
startsAt: TimingConstraint.singleton(WindowProperty.START).plus(Temporal.Duration.from({ minutes : 5}))
})
Expand All @@ -2724,20 +2723,20 @@ export default () => Goal.CoexistenceGoal({

final var planByActivityType = partitionByActivityType(results.updatedPlan());
final var peelBananas = planByActivityType.get("PeelBanana");
final var growBananas = planByActivityType.get("GrowBanana");
final var pickBananas = planByActivityType.get("PickBanana");
final var durationParamActivities = planByActivityType.get("DurationParameterActivity");

assertEquals(1, peelBananas.size());
assertEquals(1, growBananas.size());
assertEquals(1, pickBananas.size());
assertEquals(1, durationParamActivities.size());
final var peelBanana = peelBananas.iterator().next();
final var growBanana = growBananas.iterator().next();
final var pickBanana = pickBananas.iterator().next();
final var durationParamActivity = durationParamActivities.iterator().next();

assertEquals(Duration.ZERO, durationParamActivity.startOffset());

assertEquals(tenMinutes, growBanana.startOffset());
assertEquals(SerializedValue.of(1), growBanana.serializedActivity().getArguments().get("quantity"));
assertEquals(tenMinutes, pickBanana.startOffset());
assertEquals(SerializedValue.of(1), pickBanana.serializedActivity().getArguments().get("quantity"));

assertEquals(Duration.of(15, Duration.MINUTES), peelBanana.startOffset());
assertEquals(SerializedValue.of("fromStem"), peelBanana.serializedActivity().getArguments().get("peelDirection"));
Expand Down
Loading