From eef5cd1cc35ff37f13a09a3cd13e3f57300fe6c3 Mon Sep 17 00:00:00 2001
From: Matthew Dailis
Date: Mon, 1 Apr 2024 21:26:12 -0700
Subject: [PATCH 01/43] Add Task combinators to simplify makeTaskFactory
---
.../aerie/merlin/driver/SimulationDriver.java | 111 +++++++-------
.../jpl/aerie/merlin/protocol/model/Task.java | 144 +++++++++++++++++-
.../merlin/protocol/model/TaskFactory.java | 22 ++-
3 files changed, 214 insertions(+), 63 deletions(-)
diff --git a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java
index bf0a8cbe3c..900e7da930 100644
--- a/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java
+++ b/merlin-driver/src/main/java/gov/nasa/jpl/aerie/merlin/driver/SimulationDriver.java
@@ -5,16 +5,16 @@
import gov.nasa.jpl.aerie.merlin.driver.timeline.LiveCells;
import gov.nasa.jpl.aerie.merlin.driver.timeline.TemporalEventSource;
import gov.nasa.jpl.aerie.merlin.protocol.driver.Topic;
+import gov.nasa.jpl.aerie.merlin.protocol.model.Task;
import gov.nasa.jpl.aerie.merlin.protocol.model.TaskFactory;
import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
-import gov.nasa.jpl.aerie.merlin.protocol.types.InSpan;
import gov.nasa.jpl.aerie.merlin.protocol.types.InstantiationException;
-import gov.nasa.jpl.aerie.merlin.protocol.types.TaskStatus;
import gov.nasa.jpl.aerie.merlin.protocol.types.Unit;
import org.apache.commons.lang3.tuple.Pair;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -209,30 +209,23 @@ void simulateTask(final MissionModel missionModel, final TaskFactory void scheduleActivities(
final Map schedule,
final HashMap>> resolved,
final MissionModel missionModel,
final SimulationEngine engine,
final Topic activityTopic
- )
- {
- if(resolved.get(null) == null) { return; } // Nothing to simulate
-
+ ) {
+ if (resolved.get(null) == null) {
+ // Nothing to simulate
+ return;
+ }
for (final Pair directivePair : resolved.get(null)) {
final var directiveId = directivePair.getLeft();
final var startOffset = directivePair.getRight();
final var serializedDirective = schedule.get(directiveId).serializedActivity();
- final TaskFactory> task;
- try {
- task = missionModel.getTaskFactory(serializedDirective);
- } catch (final InstantiationException ex) {
- // All activity instantiations are assumed to be validated by this point
- throw new Error("Unexpected state: activity instantiation %s failed with: %s"
- .formatted(serializedDirective.getTypeName(), ex.toString()));
- }
+ final TaskFactory> task = deserializeActivity(missionModel, serializedDirective);
engine.scheduleTask(startOffset, makeTaskFactory(
directiveId,
@@ -247,52 +240,54 @@ private static void scheduleActivities(
private static TaskFactory makeTaskFactory(
final ActivityDirectiveId directiveId,
- final TaskFactory
*/
default void release() {}
+
+ default Task andThen(Task task2) {
+ return new Task<>() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ switch (Task.this.step(scheduler)) {
+ case TaskStatus.Completed> s -> {
+ return task2.step(scheduler);
+ }
+ case TaskStatus.AwaitingCondition> s -> {
+ return new TaskStatus.AwaitingCondition<>(s.condition(), s.continuation().andThen(task2));
+ }
+ case TaskStatus.CallingTask> s -> {
+ return new TaskStatus.CallingTask<>(s.childSpan(), s.child(), s.continuation().andThen(task2));
+ }
+ case TaskStatus.Delayed> s -> {
+ return new TaskStatus.Delayed<>(s.delay(), s.continuation().andThen(task2));
+ }
+ }
+ }
+ };
+ }
+
+ default Task dropOutput() {
+ return new Task<>() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ switch (this.step(scheduler)) {
+ case TaskStatus.Completed> s -> {
+ return TaskStatus.completed(Unit.UNIT);
+ }
+ case TaskStatus.AwaitingCondition> s -> {
+ return new TaskStatus.AwaitingCondition<>(s.condition(), s.continuation().dropOutput());
+ }
+ case TaskStatus.CallingTask> s -> {
+ return new TaskStatus.CallingTask<>(s.childSpan(), s.child(), s.continuation().dropOutput());
+ }
+ case TaskStatus.Delayed> s -> {
+ return new TaskStatus.Delayed<>(s.delay(), s.continuation().dropOutput());
+ }
+ }
+ }
+ };
+ }
+
+ static Task calling(Task task) {
+ return new Task() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ return TaskStatus.calling(InSpan.Parent, (TaskFactory < Output >)executor -> task, Task.empty());
+ }
+ };
+ }
+
+ static Task callingWithSpan(Task task) {
+ return new Task() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ return TaskStatus.calling(InSpan.Fresh, (TaskFactory) executor -> task, Task.empty());
+ }
+ };
+ }
+
+ static Task delaying(Duration duration) {
+ return Task.of($ -> TaskStatus.delayed(duration, Task.empty()));
+ }
+
+ static Task emitting(EventType eventType, Topic topic) {
+ return Task.run($ -> $.emit(eventType, topic));
+ }
+
+ static Task spawning(TaskFactory> taskFactory) {
+ return Task.run($ -> $.spawn(InSpan.Parent, taskFactory));
+ }
+
+ static Task spawning(Consumer f) {
+ return Task.run($ -> $.spawn(InSpan.Parent, (TaskFactory) executor -> Task.run(f)));
+ }
+
+ static Task spawningWithSpan(TaskFactory> taskFactory) {
+ return Task.run($ -> $.spawn(InSpan.Fresh, taskFactory));
+ }
+
+ static Task spawningWithSpan(Consumer f) {
+ return Task.run($ -> $.spawn(InSpan.Fresh, (TaskFactory) executor -> Task.run(f)));
+ }
+
+ static Task spawning(List> tasks) {
+ return Task.run($ -> {
+ for (final var task : tasks) {
+ $.spawn(InSpan.Fresh, task);
+ }
+ });
+ }
+
+ /**
+ * @param f Must not yield
+ * @return
+ */
+ static Task run(Consumer f) {
+ return Task.evaluate(scheduler -> {
+ f.accept(scheduler);
+ return Unit.UNIT;
+ });
+ }
+
+ static Task evaluate(Function f) {
+ return new Task<>() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ return TaskStatus.completed(f.apply(scheduler));
+ }
+ };
+ }
+
+ static Task empty() {
+ return new Task<>() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ return TaskStatus.completed(Unit.UNIT);
+ }
+ };
+ }
+
+ static Task of(Function> f) {
+ return new Task() {
+ @Override
+ public TaskStatus step(final Scheduler scheduler) {
+ return f.apply(scheduler);
+ }
+ };
+ }
}
diff --git a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/TaskFactory.java b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/TaskFactory.java
index 4ec05c866c..1de8ab53bc 100644
--- a/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/TaskFactory.java
+++ b/merlin-sdk/src/main/java/gov/nasa/jpl/aerie/merlin/protocol/model/TaskFactory.java
@@ -1,13 +1,29 @@
package gov.nasa.jpl.aerie.merlin.protocol.model;
+import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
+import gov.nasa.jpl.aerie.merlin.protocol.types.Unit;
+
import java.util.concurrent.Executor;
/**
* A factory for creating fresh copies of a task. All tasks created by a factory must be observationally equivalent.
*
- * @param
+ * @param
* The type of data returned by a task created by this factory.
*/
-public interface TaskFactory {
- Task create(Executor executor);
+public interface TaskFactory {
+ Task create(Executor executor);
+
+ static TaskFactory delaying(Duration duration) {
+ return executor -> Task.delaying(duration);
+ }
+
+ default TaskFactory andThen(TaskFactory task) {
+ return executor -> {
+ final var task1 = this.create(executor);
+ final var task2 = task.create(executor);
+
+ return task1.andThen(task2);
+ };
+ }
}
From fb8b4576235ca3b1dbafefdf81030fd0714c6a91 Mon Sep 17 00:00:00 2001
From: Matthew Dailis
Date: Mon, 23 Oct 2023 08:09:58 -0700
Subject: [PATCH 02/43] Track breadcrumbs in threaded tasks
---
.../merlin/framework/ThreadedReactionContext.java | 10 ++++++++--
.../nasa/jpl/aerie/merlin/framework/ThreadedTask.java | 7 ++++++-
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java
index d4bc94e559..9ddd9cde45 100644
--- a/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java
+++ b/merlin-framework/src/main/java/gov/nasa/jpl/aerie/merlin/framework/ThreadedReactionContext.java
@@ -8,6 +8,7 @@
import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
import gov.nasa.jpl.aerie.merlin.protocol.types.InSpan;
+import java.util.List;
import java.util.Objects;
import java.util.function.Function;
@@ -16,15 +17,18 @@ final class ThreadedReactionContext implements Context {
private final Scoped rootContext;
private final TaskHandle handle;
private Scheduler scheduler;
+ private List