Skip to content

Commit

Permalink
feat: add I#vouch
Browse files Browse the repository at this point in the history
  • Loading branch information
teletha committed Nov 11, 2024
1 parent b8e9dde commit c19c7b9
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 105 deletions.
26 changes: 17 additions & 9 deletions src/main/java/kiss/I.java
Original file line number Diff line number Diff line change
Expand Up @@ -969,11 +969,8 @@ public static <T> Signal<T> http(String request, Class<T> type, HttpClient... cl
*/
public static <T> Signal<T> http(HttpRequest.Builder request, Class<T> type, HttpClient... client) {
return new Signal<>((observer, disposer) -> {
return disposer.add(I.signal(client)
.to()
.or(I.client)
.sendAsync(request.build(), BodyHandlers.ofInputStream())
.whenComplete((res, e) -> {
return disposer
.add(I.vouch(I.client, client).sendAsync(request.build(), BodyHandlers.ofInputStream()).whenComplete((res, e) -> {
if (e == null) try {
if (res.statusCode() < 400) {
InputStream in = res.body();
Expand Down Expand Up @@ -1027,9 +1024,7 @@ public static Signal<String> http(String uri, Consumer<WebSocket> open, HttpClie
sub.next = open;
sub.o = new StringBuilder();

return disposer.add(I.signal(client)
.to()
.or(I.client)
return disposer.add(I.vouch(I.client, client)
.newWebSocketBuilder()
.connectTimeout(Duration.ofSeconds(15))
.buildAsync(URI.create(uri), sub)
Expand Down Expand Up @@ -2021,7 +2016,7 @@ public static Signal<Long> schedule(long delayTime, long intervalTime, TimeUnit
Runnable task = I.wiseC(observer).bindLast(null);
Future future;

ScheduledExecutorService exe = scheduler == null || scheduler.length == 0 || scheduler[0] == null ? I.Jobs : scheduler[0];
ScheduledExecutorService exe = vouch(Jobs, scheduler);

if (intervalTime <= 0) {
future = delayTime <= 0 ? CompletableFuture.runAsync(task, Runnable::run) : exe.schedule(task, delayTime, unit);
Expand Down Expand Up @@ -2235,6 +2230,19 @@ public static Class type(String fqcn) {
}
}

/**
* Obtains the last non-null value from the specified array. If there is no suitable value in
* the array or the array itself, the default value is retrieved.
*
* @param <T> A type of value.
* @param defaults A default value.
* @param values A candidate of values.
* @return A suitable value.
*/
public static <T> T vouch(T defaults, T... values) {
return I.signal(values).skipNull().to().or(defaults);
}

/**
* Write {@link java.lang.System.Logger.Level#WARNING} log.
*
Expand Down
104 changes: 8 additions & 96 deletions src/main/java/kiss/Signal.java
Original file line number Diff line number Diff line change
Expand Up @@ -1822,31 +1822,8 @@ public Signal<Boolean> isTerminated() {
* @param function A mapper function.
* @return {ChainableAPI}
*/
public <R> Signal<R> joinAll(WiseFunction<V, R> function) {
return joinAll(function, null);
}

/**
* Returns a new {@link Signal} that invokes the mapper action in parallel thread and waits all
* of them until all actions are completed.
*
* @param function A mapper function.
* @return {ChainableAPI}
*/
public <R> Signal<R> joinAll(WiseFunction<V, R> function, ExecutorService executor) {
return map(function::bind).buffer()
.flatIterable(v -> I.signal((executor == null ? I.Jobs : executor).invokeAll(v)).map(Future::get).toList());
}

/**
* Returns a new {@link Signal} that invokes the mapper action in parallel thread and waits
* until any single action is completed. All other actions will be cancelled.
*
* @param function A mapper function.
* @return {ChainableAPI}
*/
public <R> Signal<R> joinAny(WiseFunction<V, R> function) {
return joinAny(function, null);
public <R> Signal<R> joinAll(WiseFunction<V, R> function, ExecutorService... executor) {
return map(function::bind).buffer().flatIterable(v -> I.signal(I.vouch(I.Jobs, executor).invokeAll(v)).map(Future::get).toList());
}

/**
Expand All @@ -1856,8 +1833,9 @@ public <R> Signal<R> joinAny(WiseFunction<V, R> function) {
* @param function A mapper function.
* @return {ChainableAPI}
*/
public <R> Signal<R> joinAny(WiseFunction<V, R> function, ExecutorService executor) {
return map(function::bind).buffer().map((executor == null ? I.Jobs : executor)::invokeAny);
@SuppressWarnings("resource")
public <R> Signal<R> joinAny(WiseFunction<V, R> function, ExecutorService... executor) {
return map(function::bind).buffer().map(I.vouch(I.Jobs, executor)::invokeAny);
}

/**
Expand Down Expand Up @@ -2116,20 +2094,6 @@ public <O> Signal<O> plug(Function<Signal<V>, Signal<O>> plug) {
return plug.apply(this);
}

/**
* <p>
* Returns an {@link Signal} that emits items based on applying a function that you supply to
* each item emitted by the source {@link Signal}, where that function returns an {@link Signal}
* , and then merging those resulting {@link Signal} and emitting the results of this merger.
* </p>
*
* @param recurse A mapper function to enumerate values recursively.
* @return {ChainableAPI}
*/
public Signal<V> recurse(WiseFunction<V, V> recurse) {
return recurse(recurse, Runnable::run);
}

/**
* <p>
* Returns an {@link Signal} that emits items based on applying a function that you supply to
Expand All @@ -2141,24 +2105,10 @@ public Signal<V> recurse(WiseFunction<V, V> recurse) {
* @param executor An execution context.
* @return {ChainableAPI}
*/
public Signal<V> recurse(WiseFunction<V, V> recurse, Executor executor) {
public Signal<V> recurse(WiseFunction<V, V> recurse, Executor... executor) {
return recurseMap(e -> e.map(recurse), executor);
}

/**
* <p>
* Returns an {@link Signal} that emits items based on applying a function that you supply to
* each item emitted by the source {@link Signal}, where that function returns an {@link Signal}
* , and then merging those resulting {@link Signal} and emitting the results of this merger.
* </p>
*
* @param recurse A mapper function to enumerate values recursively.
* @return {ChainableAPI}
*/
public Signal<V> recurseMap(WiseFunction<Signal<V>, Signal<V>> recurse) {
return recurseMap(recurse, Runnable::run);
}

/**
* <p>
* Returns an {@link Signal} that emits items based on applying a function that you supply to
Expand All @@ -2170,10 +2120,10 @@ public Signal<V> recurseMap(WiseFunction<Signal<V>, Signal<V>> recurse) {
* @param executor An execution context.
* @return {ChainableAPI}
*/
public Signal<V> recurseMap(WiseFunction<Signal<V>, Signal<V>> recurse, Executor executor) {
public Signal<V> recurseMap(WiseFunction<Signal<V>, Signal<V>> recurse, Executor... executor) {
// DON'T use the recursive call, it will throw StackOverflowError.
return flatMap(init -> new Signal<>((observer, disposer) -> {
(executor == null ? I.Jobs : executor).execute(() -> {
I.vouch(Runnable::run, executor).execute(() -> {
try {
LinkedList<V> values = new LinkedList(); // LinkedList accepts null
LinkedTransferQueue<Signal<V>> signal = new LinkedTransferQueue();
Expand Down Expand Up @@ -3514,42 +3464,4 @@ private <T> Signal<T> signal(Predicate<? super V> emitCondition, T emitOutput, b
}, sub, false);
});
}

// /**
// * <p>
// * Append the current time to each event.
// * </p>
// *
// * @return Chainable API.
// */
// public Signal<Ⅱ<V, Instant>> timeStamp() {
// return map(value -> I.pair(value, Instant.now()));
// }
//
// /**
// * <p>
// * Append {@link Duration} between the current value and the previous value.
// * </p>
// *
// * @return Chainable API.
// */
// public Signal<Ⅱ<V, Duration>> timeInterval() {
// return timeStamp().map((Ⅱ) null, (prev, now) -> I.pair(now.ⅰ, Duration.between(prev == null ?
// now.ⅱ : prev.ⅱ, now.ⅱ)));
// }
//
// /**
// * <p>
// * Append {@link Duration} between the current value and the first value.
// * </p>
// *
// * @return Chainable API.
// */
// public Signal<Ⅱ<V, Duration>> timeElapsed() {
// return map(Variable::<Instant> empty, (context, value) -> {
// Instant prev = context.let(Instant::now);
//
// return I.pair(value, prev == null ? Duration.ZERO : Duration.between(prev, Instant.now()));
// });
// }
}
40 changes: 40 additions & 0 deletions src/test/java/kiss/core/VouchTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2024 Nameless Production Committee
*
* Licensed under the MIT License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/mit-license.php
*/
package kiss.core;

import org.junit.jupiter.api.Test;

import kiss.I;

class VouchTest {

@Test
void vouch() {
assert I.vouch("DEF", "A").equals("A");
assert I.vouch("DEF", "A", "B", "C").equals("C");
assert I.vouch("DEF", "A", "B", null).equals("B");
assert I.vouch("DEF", "A", null, "C").equals("C");
}

@Test
void empty() {
assert I.vouch("DEF", new String[0]).equals("DEF");
}

@Test
void nullValues() {
assert I.vouch("DEF", (String[]) null).equals("DEF");
}

@Test
void nullDefault() {
assert I.vouch(null, (String[]) null) == null;
}
}

0 comments on commit c19c7b9

Please sign in to comment.