Skip to content

Commit

Permalink
fix: use TestableScheduler instead of Chronos
Browse files Browse the repository at this point in the history
  • Loading branch information
teletha committed Oct 11, 2024
1 parent 770699b commit 3bbec5b
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 7 deletions.
111 changes: 110 additions & 1 deletion src/test/java/kiss/TestableScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@
*/
package kiss;

import static java.util.concurrent.TimeUnit.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class TestableScheduler extends Scheduler {

private long awaitingLimit = 1000;
private long awaitingLimit = 3000;

private final AtomicBoolean starting = new AtomicBoolean();

Expand Down Expand Up @@ -198,6 +202,111 @@ public boolean awaitExecutions(long required) {
return true;
}

public boolean await() {
return start().awaitIdling();
}

/**
* Freeze process.
*/
public void await(long time, TimeUnit unit) {
freezeNano(unit.toNanos(time));
}

/**
* <p>
* Freeze process.
* </p>
*
* @param time A nano time to freeze.
*/
private void freezeNano(long time) {
try {
long start = System.nanoTime();
NANOSECONDS.sleep(time);
long end = System.nanoTime();

long remaining = start + time - end;

if (0 < remaining) {
freezeNano(remaining);
}
} catch (InterruptedException e) {
throw new Error(e);
}
}

private long marked;

/**
* Record the current time. Hereafter, it is used as the start time when using
* {@link #elapse(int, TimeUnit)} or {@link #within(int, TimeUnit, Runnable)}.
*/
public final TestableScheduler mark() {
marked = System.nanoTime();

return this;
}

/**
* <p>
* Waits for the specified time from the marked time. It does not wait if it has already passed.
* </p>
* <pre>
* chronus.mark();
* asynchronous.process();
*
* chronus.elapse(100, TimeUnit.MILLSECONDS);
* assert validation.code();
* </pre>
*
* @param amount Time amount.
* @param unit Time unit.
*/
public final TestableScheduler elapse(int amount, TimeUnit unit) {
long startTime = marked + unit.toNanos(amount);

await(startTime - System.nanoTime(), NANOSECONDS);

return this;
}

/**
* <p>
* Performs the specified operation if the specified time has not yet elapsed since the marked
* time. If it has already passed, do nothing.
* </p>
* <pre>
* chronus.mark();
* synchronous.process();
*
* chronus.within(100, TimeUnit.MILLSECONDS, () -> {
* assert validation.code();
* });
* </pre>
*
* @param amount Time amount.
* @param unit Time unit.
* @param within Your process.
*/
public final TestableScheduler within(int amount, TimeUnit unit, Runnable within) {
if (within != null && System.nanoTime() < marked + unit.toNanos(amount)) {
within.run();
}
return this;
}

/**
* Create delayed {@link Executors} in the specified duration.
*
* @param time A delay time.
* @param unit A time unit.
* @return A delayed {@link Executor}.
*/
public Executor in(long time, TimeUnit unit) {
return task -> schedule(task, time, unit);
}

/**
* {@inheritDoc}
*/
Expand Down
6 changes: 4 additions & 2 deletions src/test/java/kiss/signal/OnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
import org.junit.jupiter.api.Test;

import kiss.I;
import kiss.TestableScheduler;

class OnTest extends SignalTester {

private Consumer<Runnable> after = runner -> scheduler.schedule(runner, delay, ms);

@Test
void on() {
monitor(signal -> signal.on(after).map(v -> Thread.currentThread().getName().contains("pool")));
Thread now = Thread.currentThread();
monitor(signal -> signal.on(after).map(v -> Thread.currentThread() != now));

main.emit("START");
assert main.value();
Expand Down Expand Up @@ -57,7 +59,7 @@ void complete() {
void dispose() {
// Signal#on doesn't guarantee the execution order of events (including COMPLETE)
// Single thread executor will arrange all events in serial. (FIFO)
scheduler.configExecutionLimit(1);
scheduler = new TestableScheduler(1);

monitor(signal -> signal.take(1).on(after));

Expand Down
2 changes: 1 addition & 1 deletion src/test/java/kiss/signal/SignalCreationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void interval() {
assert main.isNotCompleted();
assert main.isNotDisposed();

scheduler.await(200, ms);
scheduler.await();
assert main.value(true, true, true);
assert main.isCompleted();
assert main.isDisposed();
Expand Down
5 changes: 2 additions & 3 deletions src/test/java/kiss/signal/SignalTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
Expand All @@ -28,11 +27,11 @@
import java.util.function.Supplier;
import java.util.stream.Stream;

import antibug.Chronus;
import kiss.Disposable;
import kiss.I;
import kiss.Observer;
import kiss.Signal;
import kiss.TestableScheduler;
import kiss.WiseBiFunction;
import kiss.WiseConsumer;
import kiss.WiseFunction;
Expand Down Expand Up @@ -76,7 +75,7 @@ public class SignalTester {
protected final SignalSource another = new SignalSource();

/** The chrono scheduler. */
protected final Chronus scheduler = new Chronus(() -> Executors.newScheduledThreadPool(16));
protected TestableScheduler scheduler = new TestableScheduler();

/** The log manager. */
private Map<String, List> logs;
Expand Down

0 comments on commit 3bbec5b

Please sign in to comment.