Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-memory checkpoint simulation #1323

Merged
merged 43 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
eef5cd1
Add Task combinators to simplify makeTaskFactory
mattdailis Apr 2, 2024
fb8b457
Track breadcrumbs in threaded tasks
mattdailis Oct 23, 2023
a0bccb9
Implement duplicate for SlabList
mattdailis Oct 23, 2023
1f3f6e7
Implement duplicate for Subscriptions
mattdailis Oct 23, 2023
5fdb95f
Implement duplicate for JobSchedule
mattdailis Oct 23, 2023
2aa64b2
Implement duplicate for Profile and ProfilingState
mattdailis Oct 23, 2023
c544949
Implement duplicate for Tasks
mattdailis Oct 23, 2023
1e1dac1
Implement duplicate for ExecutionState
mattdailis Oct 23, 2023
53939d8
Implement duplicate for SimulationEngine
mattdailis Oct 23, 2023
4bf0db0
Track unstarted tasks
mattdailis Oct 23, 2023
c46327b
Implement unscheduleAfter in SimulationEngine
mattdailis Oct 23, 2023
0c11d54
Add THREADED_TASK_CACHE_READS environment variable
mattdailis Oct 23, 2023
c7e8afb
Driver updates and tests
adrienmaillard Apr 11, 2024
20fd583
Forbid all operations except duplicate on closed sim engine
mattdailis Nov 7, 2023
1e32b65
Testing improvements
adrienmaillard Apr 11, 2024
78d289a
Implement extractResource in goals, time exprs, and sched conds
mattdailis Apr 2, 2024
96c9b67
Remove duration expressions
adrienmaillard Feb 6, 2024
83251a2
Allow plan duplication and replacement operations
adrienmaillard Feb 6, 2024
e09bc9e
Add parent to string representation of sched act directive
adrienmaillard Feb 6, 2024
ab444b2
Add cached engine store
adrienmaillard Apr 11, 2024
41aa0e8
Compute simulation for subset of resources + separate activity results
adrienmaillard Apr 11, 2024
8ad147d
Create checkpoint policy
adrienmaillard Feb 6, 2024
7a7d9ca
add cached engine store
adrienmaillard Apr 11, 2024
6e3360f
Add stop functions and separate results computation from simulation
adrienmaillard Apr 11, 2024
167dfaa
Use checkpoint sim in scheduling
mattdailis Apr 2, 2024
d04cc85
Update e2e tests
adrienmaillard Feb 7, 2024
ed0b88b
Close the last interval of constraint simulation results
adrienmaillard Feb 8, 2024
1fe9388
Mock simulation results if no resources are needed
adrienmaillard Feb 9, 2024
a184a4c
useless import
adrienmaillard Mar 14, 2024
0221988
Activate thread task cache in scheduling, keep it deactivated in merlin
adrienmaillard Mar 14, 2024
7b71fde
Refactor equals() of SimulationResults
adrienmaillard Mar 15, 2024
e70bb9b
Rename test
adrienmaillard Mar 15, 2024
25bb277
Remove unused imports
adrienmaillard Mar 15, 2024
9c52391
Put test mission model in its own class
adrienmaillard Mar 15, 2024
1a05eb6
Move OneShotTask in itw own class
adrienmaillard Mar 15, 2024
e7beb42
Update default configuration
adrienmaillard Mar 29, 2024
1d9aeea
Hand-schedule dependents to have a reference to their taskId
adrienmaillard Mar 29, 2024
164a913
Inline makeTaskFactory
mattdailis Apr 5, 2024
0fcb51a
Add default implementation of duplicate that throws UnsupportedOperat…
mattdailis Apr 11, 2024
5420579
Document MAX_NB_CACHED_SIMULATION_ENGINES and set minimum to 1
mattdailis Apr 11, 2024
9e7442c
The error lambda is now the first one of the class.
adrienmaillard Apr 12, 2024
bc4ce3b
Update StartOffsetReducer
adrienmaillard Jun 4, 2024
50ec917
Remove duplicates in Plan and Evaluation
adrienmaillard Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,33 +144,37 @@ public Optional<SerializedValue> valueAt(final Duration timepoint) {
}

public static DiscreteProfile fromSimulatedProfile(final List<ProfileSegment<SerializedValue>> simulatedProfile) {
return fromProfileHelper(Duration.ZERO, simulatedProfile, Optional::of);
return fromProfileHelper(Duration.ZERO, simulatedProfile, Optional::of, true);
}

public static DiscreteProfile fromExternalProfile(final Duration offsetFromPlanStart, final List<ProfileSegment<Optional<SerializedValue>>> externalProfile) {
return fromProfileHelper(offsetFromPlanStart, externalProfile, $ -> $);
return fromProfileHelper(offsetFromPlanStart, externalProfile, $ -> $, false);
}

private static <T> DiscreteProfile fromProfileHelper(
final Duration offsetFromPlanStart,
final List<ProfileSegment<T>> profile,
final Function<T, Optional<SerializedValue>> transform
final Function<T, Optional<SerializedValue>> transform,
final boolean close
) {
final var result = new IntervalMap.Builder<SerializedValue>();
var cursor = offsetFromPlanStart;
var c = 0;
for (final var pair: profile) {
final var nextCursor = cursor.plus(pair.extent());

final var value = transform.apply(pair.dynamics());
final Duration finalCursor = cursor;
final var isLast = c == profile.size() - 1;
value.ifPresent(
$ -> result.set(
Interval.between(finalCursor, Inclusive, nextCursor, Exclusive),
Interval.between(finalCursor, Inclusive, nextCursor, (close && isLast) ? Inclusive : Exclusive),
$
)
);

cursor = nextCursor;
c++;
}

return new DiscreteProfile(result.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,30 @@ public LinearProfile shiftBy(final Duration duration) {
}

public static LinearProfile fromSimulatedProfile(final List<ProfileSegment<RealDynamics>> simulatedProfile) {
return fromProfileHelper(Duration.ZERO, simulatedProfile, Optional::of);
return fromProfileHelper(Duration.ZERO, simulatedProfile, Optional::of, true);
}

public static LinearProfile fromExternalProfile(final Duration offsetFromPlanStart, final List<ProfileSegment<Optional<RealDynamics>>> externalProfile) {
return fromProfileHelper(offsetFromPlanStart, externalProfile, $ -> $);
return fromProfileHelper(offsetFromPlanStart, externalProfile, $ -> $, false);
}

private static <T> LinearProfile fromProfileHelper(
final Duration offsetFromPlanStart,
final List<ProfileSegment<T>> profile,
final Function<T, Optional<RealDynamics>> transform
final Function<T, Optional<RealDynamics>> transform,
final boolean close
) {
final var result = new IntervalMap.Builder<LinearEquation>();
var cursor = offsetFromPlanStart;
var c = 0;
for (final var pair: profile) {
final var nextCursor = cursor.plus(pair.extent());

final var isLast = c == profile.size() - 1;
final var value = transform.apply(pair.dynamics());
final Duration finalCursor = cursor;
value.ifPresent(
$ -> result.set(
Interval.between(finalCursor, Inclusive, nextCursor, Exclusive),
Interval.between(finalCursor, Inclusive, nextCursor, (close && isLast) ? Inclusive :Exclusive),
new LinearEquation(
finalCursor,
$.initial,
Expand All @@ -206,6 +208,7 @@ private static <T> LinearProfile fromProfileHelper(
);

cursor = nextCursor;
c++;
}

return new LinearProfile(result.build());
Expand Down
1 change: 1 addition & 0 deletions deployment/Environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ See the [environment variables document](https://github.com/NASA-AMMOS/aerie-gat
| `SCHEDULER_DB_PASSWORD` | Password of the Scheduler DB User | `string` | |
| `SCHEDULER_OUTPUT_MODE` | How scheduler output is sent back to Aerie | `string` | UpdateInputPlanWithNewActivities |
| `SCHEDULER_RULES_JAR` | Jar file to load scheduling rules from (until user input to database) | `string` | /usr/src/app/merlin_file_store/scheduler_rules.jar |
| `MAX_NB_CACHED_SIMULATION_ENGINES` | The maximum number of simulation engines to cache in memory during a scheduling run. Must be at least 1 | `number` | 1 |

## Aerie Sequencing

Expand Down
1 change: 1 addition & 0 deletions deployment/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ services:
SCHEDULER_OUTPUT_MODE: UpdateInputPlanWithNewActivities
MERLIN_LOCAL_STORE: /usr/src/app/merlin_file_store
SCHEDULER_RULES_JAR: /usr/src/app/merlin_file_store/scheduler_rules.jar
MAX_NB_CACHED_SIMULATION_ENGINES: 1
JAVA_OPTS: >
-Dorg.slf4j.simpleLogger.log.com.zaxxer.hikari=INFO
-Dorg.slf4j.simpleLogger.logFile=System.err
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ services:
SCHEDULER_OUTPUT_MODE: UpdateInputPlanWithNewActivities
MERLIN_LOCAL_STORE: /usr/src/app/merlin_file_store
SCHEDULER_RULES_JAR: /usr/src/app/merlin_file_store/scheduler_rules.jar
MAX_NB_CACHED_SIMULATION_ENGINES: 1
JAVA_OPTS: >
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG
Expand All @@ -204,6 +205,7 @@ services:
SCHEDULER_OUTPUT_MODE: UpdateInputPlanWithNewActivities
MERLIN_LOCAL_STORE: /usr/src/app/merlin_file_store
SCHEDULER_RULES_JAR: /usr/src/app/merlin_file_store/scheduler_rules.jar
MAX_NB_CACHED_SIMULATION_ENGINES: 1
JAVA_OPTS: >
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
-Dorg.slf4j.simpleLogger.defaultLogLevel=DEBUG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import gov.nasa.jpl.aerie.e2e.types.Plan;
import gov.nasa.jpl.aerie.e2e.types.ProfileSegment;
import gov.nasa.jpl.aerie.e2e.types.SchedulingRequest.SchedulingStatus;
import gov.nasa.jpl.aerie.e2e.types.SimulationDataset;
import gov.nasa.jpl.aerie.e2e.types.ValueSchema;
import gov.nasa.jpl.aerie.e2e.utils.GatewayRequests;
import gov.nasa.jpl.aerie.e2e.utils.HasuraRequests;
Expand Down Expand Up @@ -381,22 +382,17 @@ void schedulingGoalPostsSimResults() throws IOException {
final var plan = hasura.getPlan(planId);
final var simResults = hasura.getSimulationDatasetByDatasetId(datasetId);

// All directive have their simulated activity
final var planActivities = plan.activityDirectives();
final var simActivities = simResults.activities();
assertEquals(4, planActivities.size());
assertEquals(planActivities.size(), simActivities.size());
for(int i = 0; i<planActivities.size(); ++i) {
boolean match = false;
for(int j = 0; j<simActivities.size(); ++j) {
if (planActivities.get(i).id() == simActivities.get(j).directiveId() &&
planActivities.get(i).startOffset().equals(simActivities.get(j).startOffset())) {
match = true;
break;
}
}
assertTrue(match);;
}
// All directive have their simulated activity
final var planActivities = plan.activityDirectives();
final var simActivities = simResults.activities();
planActivities.sort(Comparator.comparingInt(Plan.ActivityDirective::id));
simActivities.sort(Comparator.comparingInt(SimulationDataset.SimulatedActivity::directiveId));
assertEquals(4, planActivities.size());
assertEquals(planActivities.size(), simActivities.size());
for(int i = 0; i<planActivities.size(); ++i) {
assertEquals(planActivities.get(i).id(), simActivities.get(i).directiveId());
assertEquals(planActivities.get(i).startOffset(), simActivities.get(i).startOffset());
}

// All directive have their simulated activity
final var profiles = hasura.getProfiles(datasetId);
Expand Down Expand Up @@ -466,7 +462,7 @@ void schedulingGoalPostsSimResults() throws IOException {
* and it only has satisfied goals
*/
@Test
void schedulingPostsSimResultsWithSimulation() throws IOException {
void schedulingDoesNotPostSimResultsWithSimulation() throws IOException {
insertActivities();
insertSatisfyingActivities();
final int coexistenceGoalId = hasura.createSchedulingSpecGoal(
Expand All @@ -475,9 +471,10 @@ void schedulingPostsSimResultsWithSimulation() throws IOException {
schedulingSpecId,
0);
try{
hasura.awaitSimulation(planId);
final var response = hasura.awaitSimulation(planId);
final var simDataset = hasura.getSimulationDataset(response.simDatasetId());
final var schedulingResults = hasura.awaitScheduling(schedulingSpecId);
assertNull(schedulingResults.datasetId());
assertEquals(schedulingResults.datasetId(), simDataset.datasetId());
} finally {
// Teardown: Delete Goal
hasura.deleteSchedulingGoal(coexistenceGoalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ void noDirectiveIdOnDaemon() throws IOException {
// The trace starts at the original exception and doesn't include the intermediary SimulationException
final var expectedStart = """
java.lang.RuntimeException: Daemon task exception raised.
\tat gov.nasa.jpl.aerie.foomissionmodel.Mission.lambda$new$1(Mission.java""";
\tat gov.nasa.jpl.aerie.foomissionmodel.Mission.lambda$new$0(Mission.java""";
assertTrue(reason.trace().startsWith(expectedStart));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ public record SimulationDataset(
boolean canceled,
String simStartTime,
String simEndTime,
List<SimulatedActivity> activities) {
List<SimulatedActivity> activities,
Integer datasetId) {
public record SimulatedActivity(
int spanId,
Integer directiveId,
Expand Down Expand Up @@ -58,7 +59,8 @@ public static SimulationDataset fromJSON(JsonObject json) {
json.getBoolean("canceled"),
json.getString("simulation_start_time"),
json.getString("simulation_end_time"),
simActivities);
simActivities,
json.getInt("dataset_id"));
}

public enum SimulationStatus{ pending, incomplete, failed, success }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ query GetSimulationDataset($id: Int!) {
status
reason
canceled
dataset_id
simulation_start_time
simulation_end_time
simulated_activities {
Expand All @@ -453,6 +454,7 @@ query GetSimulationDataset($id: Int!) {
status
reason
canceled
dataset_id
simulation_start_time
simulation_end_time
simulated_activities {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class Mission {

public final TimeTrackerDaemon timeTrackerDaemon = new TimeTrackerDaemon();

public final Counter<Integer> counter = Counter.ofInteger();

public Mission(final Registrar registrar, final Instant planStart, final Configuration config) {
this.cachedRegistrar = registrar;

Expand Down Expand Up @@ -74,13 +76,20 @@ public Mission(final Registrar registrar, final Instant planStart, final Configu
registrar.real("/simple_data/b/rate", this.simpleData.b.rate);
registrar.real("/simple_data/total_volume", this.simpleData.totalVolume);

registrar.discrete("/counter", this.counter, new IntegerValueMapper());

spawn(timeTrackerDaemon::run);

spawn(() -> { // Register a never-ending daemon task
while (true) {
ModelActions.delay(Duration.SECOND);
spawn(replaying(new Runnable() {
@Override
public void run() { // Register a never-ending daemon task
for (int i = 0; i < 1000; i++) {
ModelActions.delay(Duration.SECOND);
}
counter.add(1);
spawn(replaying(this));
}
});
}));

if(config.raiseException) {
spawn(() -> {
Expand Down
Loading
Loading