Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: temporary usage of local CompletableFuture #2544

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ public static <T> T get(CompletableFuture<T> completableFuture, Duration duratio
}
}

public static <T> T get(CompletableFuture<T> completableFuture) {
try {
return completableFuture.get();
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

@FunctionalInterface
public interface ThrowingFunction<T, R> {
R apply(T t) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.eclipse.jkube.kit.common.KitLogger;
import org.eclipse.jkube.kit.common.util.AsyncUtil;
import org.eclipse.jkube.kit.common.util.KubernetesHelper;
import org.apache.commons.lang3.StringUtils;

Expand All @@ -41,6 +42,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

Expand All @@ -65,7 +67,6 @@ public class PodLogService {
private Watch podWatcher;
private LogWatch logWatcher;
private final Map<String, Pod> addedPods = new ConcurrentHashMap<>();
private final CountDownLatch terminateLatch = new CountDownLatch(1);
private String watchingPodName;
private CountDownLatch logWatchTerminateLatch;

Expand Down Expand Up @@ -157,9 +158,10 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s
}
}
}
final CompletableFuture<Void> logsRetrieved = new CompletableFuture<>();
// we may have missed the ADDED event so lets simulate one
if (latestPod != null) {
onPod(Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog);
onPod(logsRetrieved, Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog);
}
if (!watchAddedPodsOnly && !runningPod) {
log.warn("No pod is running yet. Are you sure you deployed your app using Eclipse JKube apply/deploy mechanism?");
Expand All @@ -168,7 +170,7 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s
podWatcher = pods.watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod pod) {
onPod(action, pod, kc, ctrlCMessage, followLog);
onPod(logsRetrieved, action, pod, kc, ctrlCMessage, followLog);
}

@Override
Expand All @@ -177,18 +179,12 @@ public void onClose(WatcherException e) {
}
});

if (waitInCurrentThread) {
while (terminateLatch.getCount() > 0) {
try {
terminateLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (waitInCurrentThread && !logsRetrieved.isDone()) {
AsyncUtil.get(logsRetrieved);
}
}

private void onPod(Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) {
private void onPod(CompletableFuture<Void> logsRetrieved, Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) {
String name = KubernetesHelper.getName(pod);
if (action.equals(Watcher.Action.DELETED)) {
addedPods.remove(name);
Expand All @@ -211,11 +207,11 @@ private void onPod(Watcher.Action action, Pod pod, NamespacedKubernetesClient ku
}

if (watchPod != null && KubernetesHelper.isPodRunning(watchPod)) {
watchLogOfPodName(kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod));
watchLogOfPodName(logsRetrieved, kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod));
}
}

private void watchLogOfPodName(NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) {
private void watchLogOfPodName(CompletableFuture<Void> logsRetrieved, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) {
if (watchingPodName == null || !watchingPodName.equals(name)) {
if (logWatcher != null) {
log.info("Closing log watcher for %s as now watching %s", watchingPodName, name);
Expand Down Expand Up @@ -250,7 +246,7 @@ private void watchLogOfPodName(NamespacedKubernetesClient kubernetes, String ctr
log.info("[[s]]%s", line);
}
}
terminateLatch.countDown();
logsRetrieved.complete(null);
}
}
}
Expand Down
Loading