From 4e36ff71f08aac3bd7c999644ff9d7e7a594ce7d Mon Sep 17 00:00:00 2001 From: Marc Nuri Date: Fri, 12 Jan 2024 14:47:08 +0100 Subject: [PATCH] refactor: PodLogService uses informer Signed-off-by: Marc Nuri --- .../config/service/PodLogEventHandler.java | 219 ++++++++++++++++++ .../kit/config/service/PodLogService.java | 170 ++------------ .../kit/config/service/PodLogServiceTest.java | 14 +- 3 files changed, 238 insertions(+), 165 deletions(-) create mode 100644 jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogEventHandler.java diff --git a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogEventHandler.java b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogEventHandler.java new file mode 100644 index 0000000000..c4cc566e46 --- /dev/null +++ b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogEventHandler.java @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2019 Red Hat, Inc. + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at: + * + * https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + * + * Contributors: + * Red Hat, Inc. - initial API and implementation + */ +package org.eclipse.jkube.kit.config.service; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.dsl.Loggable; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import org.apache.commons.lang3.StringUtils; +import org.eclipse.jkube.kit.common.KitLogger; +import org.eclipse.jkube.kit.common.util.KubernetesHelper; + +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; + +import static org.eclipse.jkube.kit.common.util.KubernetesHelper.getName; +import static org.eclipse.jkube.kit.config.service.PodLogService.OPERATION_STOP; +import static org.eclipse.jkube.kit.config.service.PodLogService.OPERATION_UNDEPLOY; +import static org.eclipse.jkube.kit.config.service.kubernetes.KubernetesClientUtil.getPodStatusDescription; + +public class PodLogEventHandler implements ResourceEventHandler, AutoCloseable { + + private final PodLogService.PodLogServiceContext context; + private final KubernetesClient kubernetesClient; // TODO, move to context + private final String onExitOperation; + private final boolean followLog; + // When followLog=false, Used as a latch to signal that the log watch has been terminated + private final CompletableFuture logsRetrieved; + private final ConcurrentMap activePods; + private final AtomicReference currentLogWatch; + + + public PodLogEventHandler(PodLogService.PodLogServiceContext context, KubernetesClient kubernetesClient, String onExitOperation, boolean followLog) { + this.context = context; + this.kubernetesClient = kubernetesClient; + this.onExitOperation = onExitOperation; + this.followLog = followLog; + activePods = new ConcurrentHashMap<>(); + currentLogWatch = new AtomicReference<>(); + logsRetrieved = new CompletableFuture<>(); + } + + @Override + public void onAdd(Pod pod) { + activePods.put(getName(pod), pod); + logStatus(context.getNewPodLog(), pod); + podLog(); + } + + @Override + public void onUpdate(Pod oldPod, Pod newPod) { + activePods.put(getName(newPod), newPod); + if (!Objects.equals(getCurrentlyLoggedPodName(), getName(newPod))) { + logStatus(Objects.equals(getName(mostRecentPod()), getName(newPod)) ? context.getNewPodLog() : context.getOldPodLog(), newPod); + } + podLog(); + } + + @Override + public void onDelete(Pod pod, boolean deletedFinalStateUnknown) { + activePods.remove(getName(pod)); + if (Objects.equals(getCurrentlyLoggedPodName(), getName(pod))) { + context.getLog().info("Closing log watcher for %s (Deleted)", getCurrentlyLoggedPodName()); + currentLogWatch.getAndSet(null).close(); + } + logStatus(context.getOldPodLog(), pod,": Pod Deleted"); + podLog(); + } + + @Override + public void close() { + if (currentLogWatch.get() != null) { + currentLogWatch.get().close(); + } + } + + public final String getCurrentlyLoggedPodName() { + return currentLogWatch.get() != null ? currentLogWatch.get().podName : null; + } + + public final CompletableFuture getLogsRetrieved() { + return logsRetrieved; + } + + private void podLog() { + final Pod pod = mostRecentPod(); + final String podName = getName(pod); + if (pod == null || !KubernetesHelper.isPodRunning(pod) || Objects.equals(getCurrentlyLoggedPodName(), podName)) { + return; + } + if (currentLogWatch.get() != null) { + context.getLog().info("Closing log watcher for %s as now watching %s", getCurrentlyLoggedPodName(), podName); + currentLogWatch.getAndSet(null).close(); + } + final List containers = KubernetesHelper.getContainers(pod); + final String containerName; + final Loggable loggable; + if (containers.size() < 2) { + containerName = containers.isEmpty() ? null : containers.iterator().next().getName(); + loggable = kubernetesClient.pods().withName(podName); + } else { + containerName = getLogContainerName(containers); + loggable = kubernetesClient.pods().withName(podName).inContainer(containerName); + } + if (followLog) { + currentLogWatch.set(watchLog(loggable, podName, containerName)); + } else { + printLog(loggable, podName, containerName); + } + } + + private LogWatchLogger watchLog(Loggable loggable, String podName, String containerName) { + context.getNewPodLog().info("Tailing log of pod: " + podName + containerNameMessage(containerName)); + context.getNewPodLog().info("Press Ctrl-C to " + computeCtrlCMessage()); + context.getNewPodLog().info(""); + final LogWatch logWatch = loggable.watchLog(); + // It's important to persist this CompletableFuture and not a chained one, this one will allow to stop the log watch + final CompletableFuture asyncLogger = KubernetesHelper + .printLogsAsync(logWatch, line -> context.getLog().info("[[s]]%s", line)); + asyncLogger.whenComplete((v, t) -> { + if (t != null) { + context.getLog().error("Failed to read log of Pod %s: %s", podName, t); + } + }); + return new LogWatchLogger(logWatch, podName, asyncLogger); + } + + private void printLog(Loggable loggable, String podName, String containerName) { + final String logText = loggable.getLog(); + if (logText != null) { + String[] lines = logText.split("\n"); + context.getLog().info("Log of pod: %s%s", podName, containerNameMessage(containerName)); + context.getLog().info(""); + for (String line : lines) { + context.getLog().info("[[s]]%s", line); + } + } + logsRetrieved.complete(null); + } + + private Pod mostRecentPod() { + return KubernetesHelper.getNewestPod(activePods.values()); + } + + private String getLogContainerName(List containers) { + if (StringUtils.isNotBlank(context.getLogContainerName())) { + for (Container container : containers) { + if (Objects.equals(context.getLogContainerName(), container.getName())) { + return context.getLogContainerName(); + } + } + context.getLog().error("log container name %s does not exist in pod!! Did you set the correct value for property 'jkube.log.container'", context.getLogContainerName()); + } + return containers.get(0).getName(); + } + + private String computeCtrlCMessage() { + if (StringUtils.isNotBlank(onExitOperation)) { + if (onExitOperation.toLowerCase(Locale.ROOT).equals(OPERATION_UNDEPLOY)) { + return "undeploy the app"; + } else if (onExitOperation.toLowerCase(Locale.ROOT).equals(OPERATION_STOP)) { + return "scale down the app and stop tailing the log"; + } + } + return "stop tailing the log"; + } + + private static String containerNameMessage(String containerName) { + if (StringUtils.isNotBlank(containerName)) { + return " container: " + containerName; + } + return ""; + } + + private static void logStatus(KitLogger logger, Pod pod) { + logStatus(logger, pod, ""); + } + + private static void logStatus(KitLogger logger, Pod pod, String postfix) { + logger.info("%s status: %s%s", getName(pod), getPodStatusDescription(pod), postfix); + } + + private static final class LogWatchLogger implements AutoCloseable { + private final LogWatch logWatch; + private final String podName; + private final CompletableFuture asyncLogger; + + + public LogWatchLogger(LogWatch logWatch, String podName, CompletableFuture asyncLogger) { + this.logWatch = logWatch; + this.podName = podName; + this.asyncLogger = asyncLogger; + } + + @Override + public void close() { + asyncLogger.complete(null); + logWatch.close(); + } + } +} diff --git a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java index 432970f1c7..0a7418aaad 100644 --- a/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java +++ b/jkube-kit/config/service/src/main/java/org/eclipse/jkube/kit/config/service/PodLogService.java @@ -13,45 +13,34 @@ */ package org.eclipse.jkube.kit.config.service; -import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.NamespacedKubernetesClient; -import io.fabric8.kubernetes.client.Watch; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; -import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.eclipse.jkube.kit.common.KitLogger; import org.eclipse.jkube.kit.common.util.AsyncUtil; import org.eclipse.jkube.kit.common.util.KubernetesHelper; -import org.apache.commons.lang3.StringUtils; import java.time.Instant; import java.util.Collection; import java.util.Date; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import static org.eclipse.jkube.kit.common.util.KubernetesHelper.withSelector; import static org.eclipse.jkube.kit.config.service.kubernetes.KubernetesClientUtil.deleteEntities; import static org.eclipse.jkube.kit.config.service.kubernetes.KubernetesClientUtil.deleteOpenShiftEntities; -import static org.eclipse.jkube.kit.config.service.kubernetes.KubernetesClientUtil.getPodStatusDescription; -import static org.eclipse.jkube.kit.config.service.kubernetes.KubernetesClientUtil.getPodStatusMessagePostfix; import static org.eclipse.jkube.kit.config.service.kubernetes.KubernetesClientUtil.resizeApp; /** @@ -65,11 +54,7 @@ public class PodLogService { private final PodLogServiceContext context; private final KitLogger log; - private Watch podWatcher; - private LogWatch logWatcher; - private final Map addedPods = new ConcurrentHashMap<>(); - private String watchingPodName; - private CountDownLatch logWatchTerminateLatch; + private SharedIndexInformer podInformer; public PodLogService(PodLogServiceContext context) { this.context = context; @@ -89,14 +74,9 @@ public void tailAppPodsLogs(final KubernetesClient kubernetes, final String name LabelSelector selector = KubernetesHelper.extractPodLabelSelector(entities); if (selector != null || StringUtils.isNotBlank(context.getPodName())) { - String ctrlCMessage = "stop tailing the log"; if (StringUtils.isNotBlank(onExitOperation)) { final String onExitOperationLower = onExitOperation.toLowerCase().trim(); - if (onExitOperationLower.equals(OPERATION_UNDEPLOY)) { - ctrlCMessage = "undeploy the app"; - } else if (onExitOperationLower.equals(OPERATION_STOP)) { - ctrlCMessage = "scale down the app and stop tailing the log"; - } else { + if (!onExitOperationLower.equals(OPERATION_UNDEPLOY) && !onExitOperationLower.equals(OPERATION_STOP)) { log.warn("Unknown on-exit command: `%s`", onExitOperationLower); } resizeApp(nsKubernetesClient, entities, 1, log); @@ -113,20 +93,19 @@ public void run() { log.info("Stopping the app:"); resizeApp(nsKubernetesClient, entities, 0, log); } - if (podWatcher != null) { - podWatcher.close(); + if (podInformer != null) { + podInformer.close(); } - closeLogWatcher(); } }); } - waitAndLogPods(nsKubernetesClient, selector, watchAddedPodsOnly, ctrlCMessage, followLog, ignorePodsOlderThan, waitInCurrentThread); + waitAndLogPods(nsKubernetesClient, selector, watchAddedPodsOnly, onExitOperation, followLog, ignorePodsOlderThan, waitInCurrentThread); } else { log.warn("No selector detected and no Pod name specified, cannot watch Pods!"); } } - private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector selector, final boolean watchAddedPodsOnly, final String ctrlCMessage, final boolean + private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector selector, final boolean watchAddedPodsOnly, String onExitOperation, final boolean followLog, Date ignorePodsOlderThan, boolean waitInCurrentThread) { final FilterWatchListDeletable pods; if (StringUtils.isNotBlank(context.getPodName())) { @@ -159,139 +138,16 @@ private void waitAndLogPods(final NamespacedKubernetesClient kc, LabelSelector s } } } - final CompletableFuture logsRetrieved = new CompletableFuture<>(); - // we may have missed the ADDED event so lets simulate one - if (latestPod != null) { - onPod(logsRetrieved, Watcher.Action.ADDED, latestPod, kc, ctrlCMessage, followLog); - } if (!watchAddedPodsOnly && !runningPod) { log.warn("No pod is running yet. Are you sure you deployed your app using Eclipse JKube apply/deploy mechanism?"); log.warn("Or did you undeploy it? If so try running the Eclipse JKube apply/deploy tasks again."); } - podWatcher = pods.watch(new Watcher() { - @Override - public void eventReceived(Action action, Pod pod) { - onPod(logsRetrieved, action, pod, kc, ctrlCMessage, followLog); - } - - @Override - public void onClose(WatcherException e) { - // ignore - } - }); - - if (waitInCurrentThread && !logsRetrieved.isDone()) { - AsyncUtil.get(logsRetrieved); - } - } - - private void onPod(CompletableFuture logsRetrieved, Watcher.Action action, Pod pod, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog) { - String name = KubernetesHelper.getName(pod); - if (action.equals(Watcher.Action.DELETED)) { - addedPods.remove(name); - if (Objects.equals(watchingPodName, name)) { - watchingPodName = null; - addedPods.remove(name); - } - } else { - if (action.equals(Watcher.Action.ADDED) || action.equals(Watcher.Action.MODIFIED)) { - addedPods.put(name, pod); - } - } - - Pod watchPod = KubernetesHelper.getNewestPod(addedPods.values()); - String newestPodName = KubernetesHelper.getName(watchPod); - - KitLogger statusLog = Objects.equals(name, newestPodName) ? context.getNewPodLog() : context.getOldPodLog(); - if (!action.equals(Watcher.Action.MODIFIED) || watchingPodName == null || !watchingPodName.equals(name)) { - statusLog.info("%s status: %s%s", name, getPodStatusDescription(pod), getPodStatusMessagePostfix(action)); - } - - if (watchPod != null && KubernetesHelper.isPodRunning(watchPod)) { - watchLogOfPodName(logsRetrieved, kubernetes, ctrlCMessage, followLog, watchPod, KubernetesHelper.getName(watchPod)); - } - } - - private void watchLogOfPodName(CompletableFuture logsRetrieved, NamespacedKubernetesClient kubernetes, String ctrlCMessage, boolean followLog, Pod pod, String name) { - if (watchingPodName == null || !watchingPodName.equals(name)) { - if (logWatcher != null) { - log.info("Closing log watcher for %s as now watching %s", watchingPodName, name); - closeLogWatcher(); - } - PodResource podResource = kubernetes.pods().withName(name); - List containers = KubernetesHelper.getContainers(pod); - String containerName = null; - if (followLog) { - watchingPodName = name; - logWatchTerminateLatch = new CountDownLatch(1); - if (containers.size() < 2) { - logWatcher = podResource.watchLog(); - } else { - containerName = getLogContainerName(containers); - logWatcher = podResource.inContainer(containerName).watchLog(); - } - watchLog(logWatcher, name, "Failed to read log of pod " + name + ".", ctrlCMessage, containerName); - } else { - String logText; - if (containers.size() < 2) { - logText = podResource.getLog(); - } else { - containerName = getLogContainerName(containers); - logText = podResource.inContainer(containerName).getLog(); - } - if (logText != null) { - String[] lines = logText.split("\n"); - log.info("Log of pod: %s%s", name, containerNameMessage(containerName)); - log.info(""); - for (String line : lines) { - log.info("[[s]]%s", line); - } - } - logsRetrieved.complete(null); - } - } - } - - private String getLogContainerName(List containers) { - if (StringUtils.isNotBlank(context.getLogContainerName())) { - for (Container container : containers) { - if (Objects.equals(context.getLogContainerName(), container.getName())) { - return context.getLogContainerName(); - } - } - log.error("log container name %s does not exist in pod!! Did you set the correct value for property 'jkube.log.container'", context.getLogContainerName()); - } - return containers.get(0).getName(); - } - - private void closeLogWatcher() { - if (logWatcher != null) { - logWatcher.close(); - logWatcher = null; - } - if (logWatchTerminateLatch != null) { - logWatchTerminateLatch.countDown(); - } - } - - private void watchLog(final LogWatch logWatcher, String podName, final String failureMessage, String ctrlCMessage, String containerName) { - context.getNewPodLog().info("Tailing log of pod: " + podName + containerNameMessage(containerName)); - context.getNewPodLog().info("Press Ctrl-C to " + ctrlCMessage); - context.getNewPodLog().info(""); - - KubernetesHelper.printLogsAsync(logWatcher, line -> log.info("[[s]]%s", line)) - .whenComplete((v, t) -> { - if (t != null) { - log.error("%s: %s", failureMessage, t); - } - }); - } - - private String containerNameMessage(String containerName) { - if (StringUtils.isNotBlank(containerName)) { - return " container: " + containerName; + final PodLogEventHandler podLogEventHandler = new PodLogEventHandler(context, kc, onExitOperation, followLog); + podInformer = pods.inform(podLogEventHandler); + podInformer.stopped().whenComplete((v, t) -> podLogEventHandler.close()); + if (waitInCurrentThread && !podLogEventHandler.getLogsRetrieved().isDone()) { + AsyncUtil.get(podLogEventHandler.getLogsRetrieved()); } - return ""; } // ======================================= diff --git a/jkube-kit/config/service/src/test/java/org/eclipse/jkube/kit/config/service/PodLogServiceTest.java b/jkube-kit/config/service/src/test/java/org/eclipse/jkube/kit/config/service/PodLogServiceTest.java index 483e1ddd9b..7768124a86 100644 --- a/jkube-kit/config/service/src/test/java/org/eclipse/jkube/kit/config/service/PodLogServiceTest.java +++ b/jkube-kit/config/service/src/test/java/org/eclipse/jkube/kit/config/service/PodLogServiceTest.java @@ -177,8 +177,6 @@ void podRunningAndDeletionShouldLogStatusUpdates() { @Test @DisplayName("With Pod running and deletion, should close previous watch") - @Disabled("This is a bug with the current implementation") - // TODO fix bug void podRunningAndDeletionShouldClosePreviousWatch() { // Given kubernetesClient.resource(runningPod).createOr(NonDeletingOperation::update); @@ -188,7 +186,7 @@ void podRunningAndDeletionShouldClosePreviousWatch() { // When kubernetesClient.resource(runningPod).delete(); // Then - verify(log, timeout(1000)).info("Closing log watcher for %s as now watching %s", "the-pod", "new-pod"); + verify(log, timeout(1000)).info("Closing log watcher for %s (Deleted)", "the-pod"); } @Test @@ -245,7 +243,7 @@ void podRunningShouldLogContainerLogs() { } @Test - @DisplayName("With Pod running and no follow, should log container logs synchronously") + @DisplayName("With Pod running and no follow, should log container logs") void podRunningWithNoFollowShouldLogContainerLogs() { // Given kubernetesClient.resource(runningPod).createOr(NonDeletingOperation::update); @@ -258,9 +256,9 @@ void podRunningWithNoFollowShouldLogContainerLogs() { new PodLogService(podLogServiceContext) .tailAppPodsLogs(kubernetesClient, null, entities, false, null, false, null, false); // Then - verify(log).info("[[s]]%s", "The"); - verify(log).info("[[s]]%s", "Application"); - verify(log).info("[[s]]%s", "Logs"); + verify(log, timeout(1000)).info("[[s]]%s", "The"); + verify(log, timeout(1000)).info("[[s]]%s", "Application"); + verify(log, timeout(1000)).info("[[s]]%s", "Logs"); } @Test @@ -305,7 +303,7 @@ void podRunningAndDeletedShouldClosePreviousWatcher() { kubernetesClient.resource(new PodBuilder(runningPod).editMetadata().withName("new-pod").endMetadata().build()) .createOr(NonDeletingOperation::update); // Then - verify(log, timeout(1000)).info("Closing log watcher for %s as now watching %s", null, "new-pod"); + verify(log, timeout(1000)).info("Closing log watcher for %s (Deleted)", "the-pod"); // Finally (prevents Client from being closed before completing the log execution) verify(newPodLog, timeout(1000)).info("Tailing log of pod: new-pod"); }