Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use skip-list multi-map to speed up job queue #1212

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,59 +1,55 @@
package gov.nasa.jpl.aerie.merlin.driver.engine;

import gov.nasa.jpl.aerie.merlin.protocol.types.Duration;
import org.apache.commons.lang3.tuple.Pair;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;

public final class JobSchedule<JobRef, TimeRef extends SchedulingInstant> {
/** The scheduled time for each upcoming job. */
private final Map<JobRef, TimeRef> scheduledJobs = new HashMap<>();

/** A time-ordered queue of all tasks whose resumption time is concretely known. */
@DerivedFrom("scheduledJobs")
private final PriorityQueue<Pair<TimeRef, JobRef>> queue = new PriorityQueue<>(Comparator.comparing(Pair::getLeft));
private final ConcurrentSkipListMap<TimeRef, Set<JobRef>> queue = new ConcurrentSkipListMap<>();

public void schedule(final JobRef job, final TimeRef time) {
final var oldTime = this.scheduledJobs.put(job, time);

if (oldTime != null) this.queue.remove(Pair.of(oldTime, job));
this.queue.add(Pair.of(time, job));
if (oldTime != null) removeJobFromQueue(oldTime, job);

this.queue.computeIfAbsent(time, $ -> new HashSet<>()).add(job);
}

public void unschedule(final JobRef job) {
final var oldTime = this.scheduledJobs.remove(job);
if (oldTime != null) removeJobFromQueue(oldTime, job);
}

if (oldTime != null) this.queue.remove(Pair.of(oldTime, job));
private void removeJobFromQueue(TimeRef time, JobRef job) {
var jobsAtOldTime = this.queue.get(time);
jobsAtOldTime.remove(job);
if (jobsAtOldTime.isEmpty()) {
this.queue.remove(time);
}
}

public Batch<JobRef> extractNextJobs(final Duration maximumTime) {
if (this.queue.isEmpty()) return new Batch<>(maximumTime, Collections.emptySet());

final var time = this.queue.peek().getKey();
final var time = this.queue.firstKey();
if (time.project().longerThan(maximumTime)) {
return new Batch<>(maximumTime, Collections.emptySet());
}

// Ready all tasks at the soonest task time.
final var readyJobs = new HashSet<JobRef>();
while (true) {
final var entry = this.queue.peek();
if (entry == null) break;
if (entry.getLeft().compareTo(time) > 0) break;

this.scheduledJobs.remove(entry.getRight());
this.queue.remove();

readyJobs.add(entry.getRight());
}

return new Batch<>(time.project(), readyJobs);
final var entry = this.queue.pollFirstEntry();
entry.getValue().forEach(this.scheduledJobs::remove);
return new Batch<>(entry.getKey().project(), entry.getValue());
}

public void clear() {
Expand Down
Loading