diff --git a/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperator.java b/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperator.java deleted file mode 100644 index 73f936ad..00000000 --- a/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperator.java +++ /dev/null @@ -1,310 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2022 EclipseSource, Lockular, Ericsson, STMicroelectronics and - * others. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v. 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0. - * - * This Source Code may also be made available under the following Secondary - * Licenses when the conditions for such availability set forth in the Eclipse - * Public License v. 2.0 are satisfied: GNU General Public License, version 2 - * with the GNU Classpath Exception which is available at - * https://www.gnu.org/software/classpath/license.html. - * - * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 - ********************************************************************************/ -package org.eclipse.theia.cloud.defaultoperator; - -import static org.eclipse.theia.cloud.common.util.LogMessageUtil.formatLogMessage; -import static org.eclipse.theia.cloud.common.util.LogMessageUtil.generateCorrelationId; - -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.eclipse.theia.cloud.common.k8s.client.TheiaCloudClient; -import org.eclipse.theia.cloud.common.k8s.resource.appdefinition.AppDefinition; -import org.eclipse.theia.cloud.common.k8s.resource.session.Session; -import org.eclipse.theia.cloud.common.k8s.resource.workspace.Workspace; -import org.eclipse.theia.cloud.operator.TheiaCloudOperator; -import org.eclipse.theia.cloud.operator.TheiaCloudOperatorArguments; -import org.eclipse.theia.cloud.operator.handler.appdef.AppDefinitionHandler; -import org.eclipse.theia.cloud.operator.handler.session.SessionHandler; -import org.eclipse.theia.cloud.operator.handler.ws.WorkspaceHandler; -import org.eclipse.theia.cloud.operator.plugins.OperatorPlugin; -import org.eclipse.theia.cloud.operator.util.SpecWatch; - -import com.google.inject.Inject; - -import io.fabric8.kubernetes.client.Watcher; - -public class DefaultTheiaCloudOperator implements TheiaCloudOperator { - - private static final ScheduledExecutorService STOP_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); - private static final ScheduledExecutorService WATCH_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); - - private static final Logger LOGGER = LogManager.getLogger(DefaultTheiaCloudOperator.class); - - private static final String COR_ID_APPDEFINITIONPREFIX = "appdefinition-watch-"; - private static final String COR_ID_WORKSPACEPREFIX = "workspace-watch-"; - private static final String COR_ID_SESSIONPREFIX = "session-watch-"; - private static final String COR_ID_TIMEOUTPREFIX = "timeout-"; - - @Inject - private TheiaCloudClient resourceClient; - - @Inject - protected Set operatorPlugins; - - @Inject - private AppDefinitionHandler appDefinitionAddedHandler; - - @Inject - private WorkspaceHandler workspaceHandler; - - @Inject - private SessionHandler sessionHandler; - - @Inject - private TheiaCloudOperatorArguments arguments; - - private final Map appDefinitionCache = new ConcurrentHashMap<>(); - private final Map workspaceCache = new ConcurrentHashMap<>(); - private final Map sessionCache = new ConcurrentHashMap<>(); - private final Set> watches = new LinkedHashSet<>(); - - @Override - public void start() { - this.operatorPlugins.forEach(plugin -> plugin.start()); - watches.add(initAppDefinitionsAndWatchForChanges()); - watches.add(initWorkspacesAndWatchForChanges()); - watches.add(initSessionsAndWatchForChanges()); - - STOP_EXECUTOR.scheduleWithFixedDelay(this::stopTimedOutSessions, 1, 1, TimeUnit.MINUTES); - WATCH_EXECUTOR.scheduleWithFixedDelay(this::lookForIdleWatches, 1, 1, TimeUnit.MINUTES); - } - - protected SpecWatch initAppDefinitionsAndWatchForChanges() { - try { - resourceClient.appDefinitions().list().forEach(this::initAppDefinition); - SpecWatch watcher = new SpecWatch<>(appDefinitionCache, this::handleAppDefnitionEvent, - "App Definition", COR_ID_APPDEFINITIONPREFIX); - resourceClient.appDefinitions().operation().watch(watcher); - return watcher; - } catch (Exception e) { - LOGGER.error(formatLogMessage(DefaultTheiaCloudOperatorLauncher.COR_ID_INIT, - "Error while initializing app definitions watch"), e); - System.exit(-1); - throw new IllegalStateException(); - } - } - - protected SpecWatch initWorkspacesAndWatchForChanges() { - try { - resourceClient.workspaces().list().forEach(this::initWorkspace); - SpecWatch watcher = new SpecWatch<>(workspaceCache, this::handleWorkspaceEvent, "Workspace", - COR_ID_WORKSPACEPREFIX); - resourceClient.workspaces().operation().watch(watcher); - return watcher; - } catch (Exception e) { - LOGGER.error( - formatLogMessage(DefaultTheiaCloudOperatorLauncher.COR_ID_INIT, "Error while initializing workspace watch"), - e); - System.exit(-1); - throw new IllegalStateException(); - } - } - - protected SpecWatch initSessionsAndWatchForChanges() { - try { - resourceClient.sessions().list().forEach(this::initSession); - SpecWatch watcher = new SpecWatch<>(sessionCache, this::handleSessionEvent, "Session", - COR_ID_SESSIONPREFIX); - resourceClient.sessions().operation().watch(watcher); - return watcher; - } catch (Exception e) { - LOGGER.error( - formatLogMessage(DefaultTheiaCloudOperatorLauncher.COR_ID_INIT, "Error while initializing session watch"), e); - System.exit(-1); - throw new IllegalStateException(); - } - } - - protected void initAppDefinition(AppDefinition resource) { - appDefinitionCache.put(resource.getMetadata().getUid(), resource); - String uid = resource.getMetadata().getUid(); - handleAppDefnitionEvent(Watcher.Action.ADDED, uid, DefaultTheiaCloudOperatorLauncher.COR_ID_INIT); - } - - protected void initWorkspace(Workspace resource) { - workspaceCache.put(resource.getMetadata().getUid(), resource); - String uid = resource.getMetadata().getUid(); - handleWorkspaceEvent(Watcher.Action.ADDED, uid, DefaultTheiaCloudOperatorLauncher.COR_ID_INIT); - } - - protected void initSession(Session resource) { - sessionCache.put(resource.getMetadata().getUid(), resource); - String uid = resource.getMetadata().getUid(); - handleSessionEvent(Watcher.Action.ADDED, uid, DefaultTheiaCloudOperatorLauncher.COR_ID_INIT); - } - - protected void handleAppDefnitionEvent(Watcher.Action action, String uid, String correlationId) { - try { - AppDefinition appDefinition = appDefinitionCache.get(uid); - switch (action) { - case ADDED: - appDefinitionAddedHandler.appDefinitionAdded(appDefinition, correlationId); - break; - case DELETED: - appDefinitionAddedHandler.appDefinitionDeleted(appDefinition, correlationId); - break; - case MODIFIED: - appDefinitionAddedHandler.appDefinitionModified(appDefinition, correlationId); - break; - case ERROR: - appDefinitionAddedHandler.appDefinitionErrored(appDefinition, correlationId); - break; - case BOOKMARK: - appDefinitionAddedHandler.appDefinitionBookmarked(appDefinition, correlationId); - break; - } - } catch (Exception e) { - LOGGER.error(formatLogMessage(correlationId, "Error while handling app definitions"), e); - } - } - - protected void handleSessionEvent(Watcher.Action action, String uid, String correlationId) { - try { - Session session = sessionCache.get(uid); - switch (action) { - case ADDED: - sessionHandler.sessionAdded(session, correlationId); - break; - case DELETED: - sessionHandler.sessionDeleted(session, correlationId); - break; - case MODIFIED: - sessionHandler.sessionModified(session, correlationId); - break; - case ERROR: - sessionHandler.sessionErrored(session, correlationId); - break; - case BOOKMARK: - sessionHandler.sessionBookmarked(session, correlationId); - break; - } - } catch (Exception e) { - LOGGER.error(formatLogMessage(correlationId, "Error while handling sessions"), e); - if (!arguments.isContinueOnException()) { - System.exit(-1); - } - } - } - - protected void handleWorkspaceEvent(Watcher.Action action, String uid, String correlationId) { - try { - Workspace workspace = workspaceCache.get(uid); - switch (action) { - case ADDED: - workspaceHandler.workspaceAdded(workspace, correlationId); - break; - case DELETED: - workspaceHandler.workspaceDeleted(workspace, correlationId); - break; - case MODIFIED: - workspaceHandler.workspaceModified(workspace, correlationId); - break; - case ERROR: - workspaceHandler.workspaceErrored(workspace, correlationId); - break; - case BOOKMARK: - workspaceHandler.workspaceBookmarked(workspace, correlationId); - break; - } - } catch (Exception e) { - LOGGER.error(formatLogMessage(correlationId, "Error while handling workspaces"), e); - if (!arguments.isContinueOnException()) { - System.exit(-1); - } - } - } - - protected void stopTimedOutSessions() { - String correlationId = generateCorrelationId(); - - try { - Set timedOutSessions = new LinkedHashSet<>(); - Instant now = Instant.now(); - for (Session session : resourceClient.sessions().list()) { - if (isSessionTimedOut(correlationId, now, session)) { - timedOutSessions.add(session.getSpec().getName()); - } - } - - for (String sessionName : timedOutSessions) { - resourceClient.sessions().delete(COR_ID_TIMEOUTPREFIX + correlationId, sessionName); - } - } catch (Exception e) { - LOGGER.error(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, "Exception in kill after runnable"), e); - if (!arguments.isContinueOnException()) { - System.exit(-1); - } - } - } - - /** - * If watches have not been called at all (neither reconnecting calls or actual - * actions), this might mean that the watch can't communicate with the kube API - * anymore. In this case we want to hand over to a different operator which will - * start up fresh watches. - */ - protected void lookForIdleWatches() { - String correlationId = generateCorrelationId(); - long now = System.currentTimeMillis(); - for (SpecWatch watch : watches) { - long idleForMs = now - watch.getLastActive(); - LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, - watch.getResourceName() + " watch was idle for " + idleForMs + " ms")); - if (idleForMs > arguments.getMaxWatchIdleTime()) { - LOGGER.error(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, watch.getResourceName() - + " was idle for too long and is assumed to be disconnected. Exit operator..")); - System.exit(-1); - } - } - } - - protected boolean isSessionTimedOut(String correlationId, Instant now, Session session) { - Optional timeout = resourceClient.appDefinitions().get(session.getSpec().getAppDefinition()) - .map(appDef -> appDef.getSpec().getTimeout()); - if (timeout.isEmpty() || timeout.get() <= 0) { - LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, - "Session " + session.getSpec().getName() + " will not be stopped automatically [NoTimeout].")); - return false; - } - int limit = timeout.get(); - String creationTimestamp = session.getMetadata().getCreationTimestamp(); - Instant parse = Instant.parse(creationTimestamp); - long minutesSinceCreation = ChronoUnit.MINUTES.between(parse, now); - LOGGER.trace(formatLogMessage(correlationId, "Checking " + session.getSpec().getName() - + ". minutesSinceLastActivity: " + minutesSinceCreation + ". limit: " + limit)); - if (minutesSinceCreation > limit) { - LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, - "Session " + session.getSpec().getName() + " was stopped after " + limit + " minutes.")); - return true; - } else { - LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, "Session " + session.getSpec().getName() - + " will keep running until the limit of " + limit + " is hit.")); - } - return false; - } -} diff --git a/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorLauncher.java b/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorLauncher.java index 7ff7e3ce..fa732eb1 100644 --- a/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorLauncher.java +++ b/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorLauncher.java @@ -15,122 +15,16 @@ ********************************************************************************/ package org.eclipse.theia.cloud.defaultoperator; -import static org.eclipse.theia.cloud.common.util.LogMessageUtil.formatLogMessage; - -import java.time.Duration; -import java.util.UUID; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.eclipse.theia.cloud.operator.TheiaCloudOperator; +import org.eclipse.theia.cloud.operator.LeaderElectionTheiaCloudOperatorLauncher; import org.eclipse.theia.cloud.operator.TheiaCloudOperatorArguments; -import org.eclipse.theia.cloud.operator.TheiaCloudOperatorLauncher; import org.eclipse.theia.cloud.operator.di.AbstractTheiaCloudOperatorModule; -import com.google.inject.Guice; -import com.google.inject.Injector; - -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientBuilder; -import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; -import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; -import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder; -import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector; -import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; -import picocli.CommandLine; - -public class DefaultTheiaCloudOperatorLauncher implements TheiaCloudOperatorLauncher { - - private static final Logger LOGGER = LogManager.getLogger(DefaultTheiaCloudOperatorLauncher.class); - - protected static final String LEASE_LOCK_NAME = "theia-cloud-operator-leaders"; - - static final String COR_ID_INIT = "init"; +public class DefaultTheiaCloudOperatorLauncher extends LeaderElectionTheiaCloudOperatorLauncher { public static void main(String[] args) throws InterruptedException { new DefaultTheiaCloudOperatorLauncher().runMain(args); } - private TheiaCloudOperatorArguments args; - - @Override - public void runMain(String[] args) throws InterruptedException { - this.args = createArguments(args); - - long leaseDurationInSeconds = this.args.getLeaderLeaseDuration(); - long renewDeadlineInSeconds = this.args.getLeaderRenewDeadline(); - long retryPeriodInSeconds = this.args.getLeaderRetryPeriod(); - - final String lockIdentity = UUID.randomUUID().toString(); - - LOGGER.info(formatLogMessage(COR_ID_INIT, - "Launching Theia Cloud Leader Election now. Own lock identity is " + lockIdentity)); - Config k8sConfig = new ConfigBuilder().build(); - - try (KubernetesClient k8sClient = new KubernetesClientBuilder().withConfig(k8sConfig).build()) { - String leaseLockNamespace = k8sClient.getNamespace(); - - LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfigBuilder()// - .withReleaseOnCancel(true)// - .withName("Theia Cloud Operator Leader Election")// - - // non leaders will check after this time if they can become leader - .withLeaseDuration(Duration.ofSeconds(leaseDurationInSeconds))// - - // time the current leader tries to refresh the lease before giving up - .withRenewDeadline(Duration.ofSeconds(renewDeadlineInSeconds)) - - // time each client should wait before performing the next action - .withRetryPeriod(Duration.ofSeconds(retryPeriodInSeconds))// - - .withLock(new LeaseLock(leaseLockNamespace, LEASE_LOCK_NAME, lockIdentity))// - .withLeaderCallbacks(new LeaderCallbacks(DefaultTheiaCloudOperatorLauncher.this::onStartLeading, - DefaultTheiaCloudOperatorLauncher.this::onStopLeading, - DefaultTheiaCloudOperatorLauncher.this::onNewLeader))// - .build(); - LeaderElector leaderElector = k8sClient.leaderElector().withConfig(leaderElectionConfig).build(); - leaderElector.run(); - } - - LOGGER.info(formatLogMessage(COR_ID_INIT, "Theia Cloud Leader Election Loop Ended")); - } - - private void onStartLeading() { - LOGGER.info(formatLogMessage(COR_ID_INIT, "Elected as new leader!")); - startOperatorAsLeader(args); - } - - private void onStopLeading() { - LOGGER.info(formatLogMessage(COR_ID_INIT, "Removed as leader!")); - System.exit(0); - } - - private void onNewLeader(String newLeader) { - LOGGER.info(formatLogMessage(COR_ID_INIT, newLeader + " is the new leader.")); - } - - protected void startOperatorAsLeader(TheiaCloudOperatorArguments arguments) { - AbstractTheiaCloudOperatorModule module = createModule(arguments); - LOGGER.info(formatLogMessage(COR_ID_INIT, "Using " + module.getClass().getName() + " as DI module")); - - Injector injector = Guice.createInjector(module); - TheiaCloudOperator theiaCloud = injector.getInstance(TheiaCloudOperator.class); - - LOGGER.info(formatLogMessage(COR_ID_INIT, "Launching Theia Cloud Now")); - theiaCloud.start(); - } - - @Override - public TheiaCloudOperatorArguments createArguments(String[] args) { - TheiaCloudOperatorArguments arguments = new TheiaCloudOperatorArguments(); - CommandLine commandLine = new CommandLine(arguments).setTrimQuotes(true); - commandLine.parseArgs(args); - LOGGER.info(formatLogMessage(COR_ID_INIT, "Parsed args: " + arguments)); - return arguments; - } - @Override public AbstractTheiaCloudOperatorModule createModule(TheiaCloudOperatorArguments arguments) { return new DefaultTheiaCloudOperatorModule(arguments); diff --git a/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorModule.java b/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorModule.java index 842c1698..fcf781e4 100644 --- a/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorModule.java +++ b/java/operator/org.eclipse.theia.cloud.defaultoperator/src/main/java/org/eclipse/theia/cloud/defaultoperator/DefaultTheiaCloudOperatorModule.java @@ -16,71 +16,20 @@ ********************************************************************************/ package org.eclipse.theia.cloud.defaultoperator; -import org.eclipse.theia.cloud.operator.TheiaCloudOperatorArguments; +import org.eclipse.theia.cloud.operator.BasicTheiaCloudOperator; import org.eclipse.theia.cloud.operator.TheiaCloudOperator; +import org.eclipse.theia.cloud.operator.TheiaCloudOperatorArguments; import org.eclipse.theia.cloud.operator.di.AbstractTheiaCloudOperatorModule; -import org.eclipse.theia.cloud.operator.handler.appdef.AppDefinitionHandler; -import org.eclipse.theia.cloud.operator.handler.appdef.EagerStartAppDefinitionAddedHandler; -import org.eclipse.theia.cloud.operator.handler.appdef.LazyStartAppDefinitionHandler; -import org.eclipse.theia.cloud.operator.handler.session.EagerStartSessionHandler; -import org.eclipse.theia.cloud.operator.handler.session.LazySessionHandler; -import org.eclipse.theia.cloud.operator.handler.session.SessionHandler; -import org.eclipse.theia.cloud.operator.handler.ws.LazyWorkspaceHandler; -import org.eclipse.theia.cloud.operator.handler.ws.WorkspaceHandler; -import org.eclipse.theia.cloud.operator.pv.MinikubePersistentVolumeCreator; -import org.eclipse.theia.cloud.operator.pv.PersistentVolumeCreator; public class DefaultTheiaCloudOperatorModule extends AbstractTheiaCloudOperatorModule { - private TheiaCloudOperatorArguments arguments; - public DefaultTheiaCloudOperatorModule(TheiaCloudOperatorArguments arguments) { - this.arguments = arguments; - } - - @Override - protected void configure() { - bind(TheiaCloudOperatorArguments.class).toInstance(arguments); - super.configure(); - } - - @Override - protected Class bindPersistentVolumeHandler() { - switch (arguments.getCloudProvider()) { - case MINIKUBE: - return MinikubePersistentVolumeCreator.class; - case K8S: - default: - return super.bindPersistentVolumeHandler(); - } - } - - @Override - protected Class bindAppDefinitionHandler() { - if (arguments.isEagerStart()) { - return EagerStartAppDefinitionAddedHandler.class; - } else { - return LazyStartAppDefinitionHandler.class; - } - } - - @Override - protected Class bindWorkspaceHandler() { - return LazyWorkspaceHandler.class; - } - - @Override - protected Class bindSessionHandler() { - if (arguments.isEagerStart()) { - return EagerStartSessionHandler.class; - } else { - return LazySessionHandler.class; - } + super(arguments); } @Override protected Class bindTheiaCloudOperator() { - return DefaultTheiaCloudOperator.class; + return BasicTheiaCloudOperator.class; } } diff --git a/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/BasicTheiaCloudOperator.java b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/BasicTheiaCloudOperator.java new file mode 100644 index 00000000..39429263 --- /dev/null +++ b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/BasicTheiaCloudOperator.java @@ -0,0 +1,308 @@ +/******************************************************************************** + * Copyright (C) 2023 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +package org.eclipse.theia.cloud.operator; + +import static org.eclipse.theia.cloud.common.util.LogMessageUtil.formatLogMessage; +import static org.eclipse.theia.cloud.common.util.LogMessageUtil.generateCorrelationId; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.theia.cloud.common.k8s.client.TheiaCloudClient; +import org.eclipse.theia.cloud.common.k8s.resource.appdefinition.AppDefinition; +import org.eclipse.theia.cloud.common.k8s.resource.session.Session; +import org.eclipse.theia.cloud.common.k8s.resource.workspace.Workspace; +import org.eclipse.theia.cloud.operator.handler.appdef.AppDefinitionHandler; +import org.eclipse.theia.cloud.operator.handler.session.SessionHandler; +import org.eclipse.theia.cloud.operator.handler.ws.WorkspaceHandler; +import org.eclipse.theia.cloud.operator.plugins.OperatorPlugin; +import org.eclipse.theia.cloud.operator.util.SpecWatch; + +import com.google.inject.Inject; + +import io.fabric8.kubernetes.client.Watcher; + +public class BasicTheiaCloudOperator implements TheiaCloudOperator { + + private static final ScheduledExecutorService STOP_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService WATCH_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + + private static final Logger LOGGER = LogManager.getLogger(BasicTheiaCloudOperator.class); + + private static final String COR_ID_APPDEFINITIONPREFIX = "appdefinition-watch-"; + private static final String COR_ID_WORKSPACEPREFIX = "workspace-watch-"; + private static final String COR_ID_SESSIONPREFIX = "session-watch-"; + private static final String COR_ID_TIMEOUTPREFIX = "timeout-"; + + @Inject + private TheiaCloudClient resourceClient; + + @Inject + protected Set operatorPlugins; + + @Inject + private AppDefinitionHandler appDefinitionAddedHandler; + + @Inject + private WorkspaceHandler workspaceHandler; + + @Inject + private SessionHandler sessionHandler; + + @Inject + private TheiaCloudOperatorArguments arguments; + + private final Map appDefinitionCache = new ConcurrentHashMap<>(); + private final Map workspaceCache = new ConcurrentHashMap<>(); + private final Map sessionCache = new ConcurrentHashMap<>(); + private final Set> watches = new LinkedHashSet<>(); + + @Override + public void start() { + this.operatorPlugins.forEach(plugin -> plugin.start()); + watches.add(initAppDefinitionsAndWatchForChanges()); + watches.add(initWorkspacesAndWatchForChanges()); + watches.add(initSessionsAndWatchForChanges()); + + STOP_EXECUTOR.scheduleWithFixedDelay(this::stopTimedOutSessions, 1, 1, TimeUnit.MINUTES); + WATCH_EXECUTOR.scheduleWithFixedDelay(this::lookForIdleWatches, 1, 1, TimeUnit.MINUTES); + } + + protected SpecWatch initAppDefinitionsAndWatchForChanges() { + try { + resourceClient.appDefinitions().list().forEach(this::initAppDefinition); + SpecWatch watcher = new SpecWatch<>(appDefinitionCache, this::handleAppDefnitionEvent, + "App Definition", COR_ID_APPDEFINITIONPREFIX); + resourceClient.appDefinitions().operation().watch(watcher); + return watcher; + } catch (Exception e) { + LOGGER.error(formatLogMessage(TheiaCloudOperatorLauncher.COR_ID_INIT, + "Error while initializing app definitions watch"), e); + System.exit(-1); + throw new IllegalStateException(); + } + } + + protected SpecWatch initWorkspacesAndWatchForChanges() { + try { + resourceClient.workspaces().list().forEach(this::initWorkspace); + SpecWatch watcher = new SpecWatch<>(workspaceCache, this::handleWorkspaceEvent, "Workspace", + COR_ID_WORKSPACEPREFIX); + resourceClient.workspaces().operation().watch(watcher); + return watcher; + } catch (Exception e) { + LOGGER.error(formatLogMessage(TheiaCloudOperatorLauncher.COR_ID_INIT, + "Error while initializing workspace watch"), e); + System.exit(-1); + throw new IllegalStateException(); + } + } + + protected SpecWatch initSessionsAndWatchForChanges() { + try { + resourceClient.sessions().list().forEach(this::initSession); + SpecWatch watcher = new SpecWatch<>(sessionCache, this::handleSessionEvent, "Session", + COR_ID_SESSIONPREFIX); + resourceClient.sessions().operation().watch(watcher); + return watcher; + } catch (Exception e) { + LOGGER.error( + formatLogMessage(TheiaCloudOperatorLauncher.COR_ID_INIT, "Error while initializing session watch"), + e); + System.exit(-1); + throw new IllegalStateException(); + } + } + + protected void initAppDefinition(AppDefinition resource) { + appDefinitionCache.put(resource.getMetadata().getUid(), resource); + String uid = resource.getMetadata().getUid(); + handleAppDefnitionEvent(Watcher.Action.ADDED, uid, TheiaCloudOperatorLauncher.COR_ID_INIT); + } + + protected void initWorkspace(Workspace resource) { + workspaceCache.put(resource.getMetadata().getUid(), resource); + String uid = resource.getMetadata().getUid(); + handleWorkspaceEvent(Watcher.Action.ADDED, uid, TheiaCloudOperatorLauncher.COR_ID_INIT); + } + + protected void initSession(Session resource) { + sessionCache.put(resource.getMetadata().getUid(), resource); + String uid = resource.getMetadata().getUid(); + handleSessionEvent(Watcher.Action.ADDED, uid, TheiaCloudOperatorLauncher.COR_ID_INIT); + } + + protected void handleAppDefnitionEvent(Watcher.Action action, String uid, String correlationId) { + try { + AppDefinition appDefinition = appDefinitionCache.get(uid); + switch (action) { + case ADDED: + appDefinitionAddedHandler.appDefinitionAdded(appDefinition, correlationId); + break; + case DELETED: + appDefinitionAddedHandler.appDefinitionDeleted(appDefinition, correlationId); + break; + case MODIFIED: + appDefinitionAddedHandler.appDefinitionModified(appDefinition, correlationId); + break; + case ERROR: + appDefinitionAddedHandler.appDefinitionErrored(appDefinition, correlationId); + break; + case BOOKMARK: + appDefinitionAddedHandler.appDefinitionBookmarked(appDefinition, correlationId); + break; + } + } catch (Exception e) { + LOGGER.error(formatLogMessage(correlationId, "Error while handling app definitions"), e); + } + } + + protected void handleSessionEvent(Watcher.Action action, String uid, String correlationId) { + try { + Session session = sessionCache.get(uid); + switch (action) { + case ADDED: + sessionHandler.sessionAdded(session, correlationId); + break; + case DELETED: + sessionHandler.sessionDeleted(session, correlationId); + break; + case MODIFIED: + sessionHandler.sessionModified(session, correlationId); + break; + case ERROR: + sessionHandler.sessionErrored(session, correlationId); + break; + case BOOKMARK: + sessionHandler.sessionBookmarked(session, correlationId); + break; + } + } catch (Exception e) { + LOGGER.error(formatLogMessage(correlationId, "Error while handling sessions"), e); + if (!arguments.isContinueOnException()) { + System.exit(-1); + } + } + } + + protected void handleWorkspaceEvent(Watcher.Action action, String uid, String correlationId) { + try { + Workspace workspace = workspaceCache.get(uid); + switch (action) { + case ADDED: + workspaceHandler.workspaceAdded(workspace, correlationId); + break; + case DELETED: + workspaceHandler.workspaceDeleted(workspace, correlationId); + break; + case MODIFIED: + workspaceHandler.workspaceModified(workspace, correlationId); + break; + case ERROR: + workspaceHandler.workspaceErrored(workspace, correlationId); + break; + case BOOKMARK: + workspaceHandler.workspaceBookmarked(workspace, correlationId); + break; + } + } catch (Exception e) { + LOGGER.error(formatLogMessage(correlationId, "Error while handling workspaces"), e); + if (!arguments.isContinueOnException()) { + System.exit(-1); + } + } + } + + protected void stopTimedOutSessions() { + String correlationId = generateCorrelationId(); + + try { + Set timedOutSessions = new LinkedHashSet<>(); + Instant now = Instant.now(); + for (Session session : resourceClient.sessions().list()) { + if (isSessionTimedOut(correlationId, now, session)) { + timedOutSessions.add(session.getSpec().getName()); + } + } + + for (String sessionName : timedOutSessions) { + resourceClient.sessions().delete(COR_ID_TIMEOUTPREFIX + correlationId, sessionName); + } + } catch (Exception e) { + LOGGER.error(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, "Exception in kill after runnable"), e); + if (!arguments.isContinueOnException()) { + System.exit(-1); + } + } + } + + /** + * If watches have not been called at all (neither reconnecting calls or actual + * actions), this might mean that the watch can't communicate with the kube API + * anymore. In this case we want to hand over to a different operator which will + * start up fresh watches. + */ + protected void lookForIdleWatches() { + String correlationId = generateCorrelationId(); + long now = System.currentTimeMillis(); + for (SpecWatch watch : watches) { + long idleForMs = now - watch.getLastActive(); + LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, + watch.getResourceName() + " watch was idle for " + idleForMs + " ms")); + if (idleForMs > arguments.getMaxWatchIdleTime()) { + LOGGER.error(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, watch.getResourceName() + + " was idle for too long and is assumed to be disconnected. Exit operator..")); + System.exit(-1); + } + } + } + + protected boolean isSessionTimedOut(String correlationId, Instant now, Session session) { + Optional timeout = resourceClient.appDefinitions().get(session.getSpec().getAppDefinition()) + .map(appDef -> appDef.getSpec().getTimeout()); + if (timeout.isEmpty() || timeout.get() <= 0) { + LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, + "Session " + session.getSpec().getName() + " will not be stopped automatically [NoTimeout].")); + return false; + } + int limit = timeout.get(); + String creationTimestamp = session.getMetadata().getCreationTimestamp(); + Instant parse = Instant.parse(creationTimestamp); + long minutesSinceCreation = ChronoUnit.MINUTES.between(parse, now); + LOGGER.trace(formatLogMessage(correlationId, "Checking " + session.getSpec().getName() + + ". minutesSinceLastActivity: " + minutesSinceCreation + ". limit: " + limit)); + if (minutesSinceCreation > limit) { + LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, + "Session " + session.getSpec().getName() + " was stopped after " + limit + " minutes.")); + return true; + } else { + LOGGER.trace(formatLogMessage(COR_ID_TIMEOUTPREFIX, correlationId, "Session " + session.getSpec().getName() + + " will keep running until the limit of " + limit + " is hit.")); + } + return false; + } + +} diff --git a/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/LeaderElectionTheiaCloudOperatorLauncher.java b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/LeaderElectionTheiaCloudOperatorLauncher.java new file mode 100644 index 00000000..3bacdb6f --- /dev/null +++ b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/LeaderElectionTheiaCloudOperatorLauncher.java @@ -0,0 +1,124 @@ +/******************************************************************************** + * Copyright (C) 2023 EclipseSource and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ +package org.eclipse.theia.cloud.operator; + +import static org.eclipse.theia.cloud.common.util.LogMessageUtil.formatLogMessage; + +import java.time.Duration; +import java.util.UUID; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.eclipse.theia.cloud.operator.di.AbstractTheiaCloudOperatorModule; + +import com.google.inject.Guice; +import com.google.inject.Injector; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector; +import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock; + +public abstract class LeaderElectionTheiaCloudOperatorLauncher extends TheiaCloudOperatorLauncher { + + private static final Logger LOGGER = LogManager.getLogger(LeaderElectionTheiaCloudOperatorLauncher.class); + + protected static final String LEASE_LOCK_NAME = "theia-cloud-operator-leaders"; + + @Override + public void runMain(String[] args) throws InterruptedException { + this.args = createArguments(args); + + LOGGER.info(formatLogMessage(COR_ID_INIT, + "Launching Theia Cloud Leader Election now")); + + this.runLeaderElection(this.args); + + LOGGER.info(formatLogMessage(COR_ID_INIT, "Theia Cloud Leader Election Loop Ended")); + } + + protected void runLeaderElection(TheiaCloudOperatorArguments args) { + final String lockIdentity = UUID.randomUUID().toString(); + LOGGER.info(formatLogMessage(COR_ID_INIT, + "Own lock identity is " + lockIdentity)); + + long leaseDurationInSeconds = this.args.getLeaderLeaseDuration(); + long renewDeadlineInSeconds = this.args.getLeaderRenewDeadline(); + long retryPeriodInSeconds = this.args.getLeaderRetryPeriod(); + + Config k8sConfig = new ConfigBuilder().build(); + + try (KubernetesClient k8sClient = new KubernetesClientBuilder().withConfig(k8sConfig).build()) { + String leaseLockNamespace = k8sClient.getNamespace(); + + LeaderElectionConfig leaderElectionConfig = new LeaderElectionConfigBuilder()// + .withReleaseOnCancel(true)// + .withName("Theia Cloud Operator Leader Election")// + + // non leaders will check after this time if they can become leader + .withLeaseDuration(Duration.ofSeconds(leaseDurationInSeconds))// + + // time the current leader tries to refresh the lease before giving up + .withRenewDeadline(Duration.ofSeconds(renewDeadlineInSeconds)) + + // time each client should wait before performing the next action + .withRetryPeriod(Duration.ofSeconds(retryPeriodInSeconds))// + + .withLock(new LeaseLock(leaseLockNamespace, LEASE_LOCK_NAME, lockIdentity))// + .withLeaderCallbacks( + new LeaderCallbacks(LeaderElectionTheiaCloudOperatorLauncher.this::onStartLeading, + LeaderElectionTheiaCloudOperatorLauncher.this::onStopLeading, + LeaderElectionTheiaCloudOperatorLauncher.this::onNewLeader))// + .build(); + LeaderElector leaderElector = k8sClient.leaderElector().withConfig(leaderElectionConfig).build(); + leaderElector.run(); + } + } + + protected void onStartLeading() { + LOGGER.info(formatLogMessage(COR_ID_INIT, "Elected as new leader!")); + startOperatorAsLeader(args); + } + + protected void onStopLeading() { + LOGGER.info(formatLogMessage(COR_ID_INIT, "Removed as leader!")); + System.exit(0); + } + + protected void onNewLeader(String newLeader) { + LOGGER.info(formatLogMessage(COR_ID_INIT, newLeader + " is the new leader.")); + } + + protected void startOperatorAsLeader(TheiaCloudOperatorArguments arguments) { + AbstractTheiaCloudOperatorModule module = createModule(arguments); + LOGGER.info(formatLogMessage(COR_ID_INIT, "Using " + module.getClass().getName() + " as DI module")); + + Injector injector = Guice.createInjector(module); + TheiaCloudOperator theiaCloud = injector.getInstance(TheiaCloudOperator.class); + + LOGGER.info(formatLogMessage(COR_ID_INIT, "Launching Theia Cloud Now")); + theiaCloud.start(); + } + + @Override + public abstract AbstractTheiaCloudOperatorModule createModule(TheiaCloudOperatorArguments arguments); + +} diff --git a/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/TheiaCloudOperatorLauncher.java b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/TheiaCloudOperatorLauncher.java index b64c6293..44eb7085 100644 --- a/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/TheiaCloudOperatorLauncher.java +++ b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/TheiaCloudOperatorLauncher.java @@ -16,13 +16,43 @@ ********************************************************************************/ package org.eclipse.theia.cloud.operator; +import static org.eclipse.theia.cloud.common.util.LogMessageUtil.formatLogMessage; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.eclipse.theia.cloud.operator.di.AbstractTheiaCloudOperatorModule; -public interface TheiaCloudOperatorLauncher { +import com.google.inject.Guice; +import com.google.inject.Injector; + +import picocli.CommandLine; + +public abstract class TheiaCloudOperatorLauncher { + + static final String COR_ID_INIT = "init"; + + private static final Logger LOGGER = LogManager.getLogger(TheiaCloudOperatorLauncher.class); + + protected TheiaCloudOperatorArguments args; + + public void runMain(String[] args) throws InterruptedException { + this.args = createArguments(args); + AbstractTheiaCloudOperatorModule module = createModule(this.args); + LOGGER.info(formatLogMessage(COR_ID_INIT, "Using " + module.getClass().getName() + " as DI module")); - abstract void runMain(String[] args) throws InterruptedException; + Injector injector = Guice.createInjector(module); + TheiaCloudOperator theiaCloud = injector.getInstance(TheiaCloudOperator.class); + LOGGER.info(formatLogMessage(COR_ID_INIT, "Launching Theia Cloud Now")); + theiaCloud.start(); + } - abstract TheiaCloudOperatorArguments createArguments(String[] args); + public TheiaCloudOperatorArguments createArguments(String[] args) { + TheiaCloudOperatorArguments arguments = new TheiaCloudOperatorArguments(); + CommandLine commandLine = new CommandLine(arguments).setTrimQuotes(true); + commandLine.parseArgs(args); + LOGGER.info(formatLogMessage(COR_ID_INIT, "Parsed args: " + arguments)); + return arguments; + } abstract AbstractTheiaCloudOperatorModule createModule(TheiaCloudOperatorArguments arguments); diff --git a/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/di/AbstractTheiaCloudOperatorModule.java b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/di/AbstractTheiaCloudOperatorModule.java index c6b5bf76..9615d07d 100644 --- a/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/di/AbstractTheiaCloudOperatorModule.java +++ b/java/operator/org.eclipse.theia.cloud.operator/src/main/java/org/eclipse/theia/cloud/operator/di/AbstractTheiaCloudOperatorModule.java @@ -25,10 +25,16 @@ import org.eclipse.theia.cloud.common.k8s.resource.workspace.Workspace; import org.eclipse.theia.cloud.common.util.CustomResourceUtil; import org.eclipse.theia.cloud.operator.TheiaCloudOperator; +import org.eclipse.theia.cloud.operator.TheiaCloudOperatorArguments; import org.eclipse.theia.cloud.operator.bandwidth.BandwidthLimiter; import org.eclipse.theia.cloud.operator.bandwidth.BandwidthLimiterImpl; import org.eclipse.theia.cloud.operator.handler.appdef.AppDefinitionHandler; +import org.eclipse.theia.cloud.operator.handler.appdef.EagerStartAppDefinitionAddedHandler; +import org.eclipse.theia.cloud.operator.handler.appdef.LazyStartAppDefinitionHandler; +import org.eclipse.theia.cloud.operator.handler.session.EagerStartSessionHandler; +import org.eclipse.theia.cloud.operator.handler.session.LazySessionHandler; import org.eclipse.theia.cloud.operator.handler.session.SessionHandler; +import org.eclipse.theia.cloud.operator.handler.ws.LazyWorkspaceHandler; import org.eclipse.theia.cloud.operator.handler.ws.WorkspaceHandler; import org.eclipse.theia.cloud.operator.ingress.IngressPathProvider; import org.eclipse.theia.cloud.operator.ingress.IngressPathProviderImpl; @@ -37,6 +43,7 @@ import org.eclipse.theia.cloud.operator.plugins.MonitorActivityTracker; import org.eclipse.theia.cloud.operator.plugins.OperatorPlugin; import org.eclipse.theia.cloud.operator.pv.DefaultPersistentVolumeCreator; +import org.eclipse.theia.cloud.operator.pv.MinikubePersistentVolumeCreator; import org.eclipse.theia.cloud.operator.pv.PersistentVolumeCreator; import org.eclipse.theia.cloud.operator.replacements.DefaultDeploymentTemplateReplacements; import org.eclipse.theia.cloud.operator.replacements.DefaultPersistentVolumeTemplateReplacements; @@ -50,6 +57,13 @@ import io.fabric8.kubernetes.client.NamespacedKubernetesClient; public abstract class AbstractTheiaCloudOperatorModule extends AbstractModule { + + protected TheiaCloudOperatorArguments arguments; + + public AbstractTheiaCloudOperatorModule(TheiaCloudOperatorArguments arguments) { + this.arguments = arguments; + } + @Override protected void configure() { bind(TheiaCloudOperator.class).to(bindTheiaCloudOperator()).in(Singleton.class); @@ -67,6 +81,7 @@ protected void configure() { configure(MultiBinding.create(OperatorPlugin.class), this::bindOperatorPlugins); bind(MonitorMessagingService.class).to(bindMonitorMessagingService()).in(Singleton.class); + bind(TheiaCloudOperatorArguments.class).toInstance(arguments); } protected void configure(final MultiBinding binding, final Consumer> configurator) { @@ -81,7 +96,13 @@ protected Class bindBandwidthLimiter() { } protected Class bindPersistentVolumeHandler() { - return DefaultPersistentVolumeCreator.class; + switch (arguments.getCloudProvider()) { + case MINIKUBE: + return MinikubePersistentVolumeCreator.class; + case K8S: + default: + return DefaultPersistentVolumeCreator.class; + } } protected Class bindIngressPathProvider() { @@ -108,11 +129,25 @@ protected Class bindPersistentVo return DefaultPersistentVolumeTemplateReplacements.class; } - protected abstract Class bindWorkspaceHandler(); + protected Class bindAppDefinitionHandler() { + if (arguments.isEagerStart()) { + return EagerStartAppDefinitionAddedHandler.class; + } else { + return LazyStartAppDefinitionHandler.class; + } + } - protected abstract Class bindAppDefinitionHandler(); + protected Class bindWorkspaceHandler() { + return LazyWorkspaceHandler.class; + } - protected abstract Class bindSessionHandler(); + protected Class bindSessionHandler() { + if (arguments.isEagerStart()) { + return EagerStartSessionHandler.class; + } else { + return LazySessionHandler.class; + } + } @Provides @Singleton