Skip to content

Commit

Permalink
fix: fix issues with run lifecycle in k8s + bump kubernetes client dep
Browse files Browse the repository at this point in the history
  • Loading branch information
matteo-s committed May 14, 2024
1 parent f42b439 commit 0ce18b9
Show file tree
Hide file tree
Showing 20 changed files with 270 additions and 200 deletions.
2 changes: 1 addition & 1 deletion application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<spring-cloud.version>2023.0.0</spring-cloud.version>
<springdoc.version>2.2.0</springdoc.version>
<jackson.version>2.15.3</jackson.version>
<kubernetes.version>19.0.0</kubernetes.version>
<kubernetes.version>20.0.0-legacy</kubernetes.version>
<postgresql.version>42.7.2</postgresql.version>
<mysql.version>8.0.33</mysql.version>
<checkstyle.config.location>../checkstyle/checkstyle.xml</checkstyle.config.location>
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,47 @@ public Run delete(@NotNull Run run) throws NoSuchEntityException {
// Create Runnable
RunRunnable runnable = runtime.delete(run);

return Optional.ofNullable(runnable);
} else {
return Optional.empty();
}
});
fsm
.getState(State.ERROR)
.getTransaction(RunEvent.DELETING)
.setInternalLogic((context, input, stateMachine) -> {
if (!Optional.ofNullable(runBaseSpec.getLocalExecution()).orElse(Boolean.FALSE)) {
// Retrieve Runtime and build run
Runtime<
? extends ExecutableBaseSpec,
? extends RunBaseSpec,
? extends RunBaseStatus,
? extends RunRunnable
> runtime = runtimeFactory.getRuntime(executable.getKind());
// Create Runnable
RunRunnable runnable = runtime.delete(run);

return Optional.ofNullable(runnable);
} else {
return Optional.empty();
}
});

fsm
.getState(State.COMPLETED)
.getTransaction(RunEvent.DELETING)
.setInternalLogic((context, input, stateMachine) -> {
if (!Optional.ofNullable(runBaseSpec.getLocalExecution()).orElse(Boolean.FALSE)) {
// Retrieve Runtime and build run
Runtime<
? extends ExecutableBaseSpec,
? extends RunBaseSpec,
? extends RunBaseStatus,
? extends RunRunnable
> runtime = runtimeFactory.getRuntime(executable.getKind());
// Create Runnable
RunRunnable runnable = runtime.delete(run);

return Optional.ofNullable(runnable);
} else {
return Optional.empty();
Expand Down Expand Up @@ -373,10 +414,12 @@ public void onChangedEvent(RunnableChangedEvent<RunRunnable> event) throws Store
run -> {
try {
if (
//either signal an update or track progress (running state)
!Objects.equals(
StatusFieldAccessor.with(run.getStatus()).getState(),
runnableMonitorObject.getStateId()
)
) ||
State.RUNNING == State.valueOf(runnableMonitorObject.getStateId())
) {
switch (State.valueOf(runnableMonitorObject.getStateId())) {
case COMPLETED:
Expand Down Expand Up @@ -452,7 +495,21 @@ private void onRunning(Run run, RunnableChangedEvent<RunRunnable> event)
RunBaseStatus runStatus = runtime.onRunning(run, runnable);
return Optional.ofNullable(runStatus);
});
fsm
.getState(State.RUNNING)
.getTransaction(RunEvent.LOOP)
.setInternalLogic((context, input, fsmInstance) -> {
log.info(
"Executing internal logic for state RUNNING, " + "event :{}, context: {}, input: {}",
RunEvent.LOOP,
context,
input
);

RunRunnable runnable = event != null ? event.getRunnable() : null;
RunBaseStatus runStatus = runtime.onRunning(run, runnable);
return Optional.ofNullable(runStatus);
});
try {
Optional<RunBaseStatus> runStatus = fsm.goToState(State.RUNNING, null);
runStatus.ifPresentOrElse(
Expand All @@ -468,7 +525,7 @@ private void onRunning(Run run, RunnableChangedEvent<RunRunnable> event)
);
entityService.update(run.getId(), run);
} catch (InvalidTransactionException e) {
log.debug("Invalid transaction from state {} to state {}", State.READY, State.RUNNING);
log.debug("Invalid transaction from state {} to state {}", e.getFromState(), e.getToState());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package it.smartcommunitylabdhub.core.config;

import it.smartcommunitylabdhub.commons.infrastructure.Builder;
import it.smartcommunitylabdhub.commons.infrastructure.RunRunnable;
import it.smartcommunitylabdhub.commons.infrastructure.Runner;
import it.smartcommunitylabdhub.commons.infrastructure.Runtime;
import it.smartcommunitylabdhub.commons.models.base.ExecutableBaseSpec;
import it.smartcommunitylabdhub.commons.models.entities.run.RunBaseSpec;
import it.smartcommunitylabdhub.commons.models.entities.run.RunBaseStatus;
import it.smartcommunitylabdhub.commons.models.entities.task.TaskBaseSpec;
import it.smartcommunitylabdhub.commons.services.RunnableStore;
import it.smartcommunitylabdhub.core.components.infrastructure.factories.builders.BuilderFactory;
import it.smartcommunitylabdhub.core.components.infrastructure.factories.runners.RunnerFactory;
import it.smartcommunitylabdhub.core.components.infrastructure.factories.runtimes.RuntimeFactory;
import it.smartcommunitylabdhub.core.repositories.RunnableRepository;
import it.smartcommunitylabdhub.core.services.RunnableStoreImpl;
Expand All @@ -30,18 +25,6 @@ protected RuntimeFactory runtimeFactory(
return new RuntimeFactory(runtimes);
}

@Bean
protected BuilderFactory builderFactory(
List<Builder<? extends ExecutableBaseSpec, ? extends TaskBaseSpec, ? extends RunBaseSpec>> builders
) {
return new BuilderFactory(builders);
}

@Bean
protected RunnerFactory runnerFactory(List<Runner<? extends RunRunnable>> runners) {
return new RunnerFactory(runners);
}

@Bean
protected RunnableStore.StoreSupplier runnableStoreService(RunnableRepository runnableRepository) {
return new RunnableStore.StoreSupplier() {
Expand Down
2 changes: 1 addition & 1 deletion modules/framework-k8s/.flattened-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
<version>19.0.0</version>
<version>20.0.0-legacy</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package it.smartcommunitylabdhub.framework.k8s.infrastructure.k8s;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1EnvFromSource;
Expand All @@ -13,8 +15,12 @@
import io.kubernetes.client.openapi.models.V1VolumeMount;
import it.smartcommunitylabdhub.commons.config.ApplicationProperties;
import it.smartcommunitylabdhub.commons.infrastructure.Framework;
import it.smartcommunitylabdhub.commons.jackson.JacksonMapper;
import it.smartcommunitylabdhub.commons.jackson.mixins.ConcreteSpecMixin;
import it.smartcommunitylabdhub.commons.models.base.BaseSpec;
import it.smartcommunitylabdhub.commons.models.enums.State;
import it.smartcommunitylabdhub.framework.k8s.exceptions.K8sFrameworkException;
import it.smartcommunitylabdhub.framework.k8s.jackson.IntOrStringMixin;
import it.smartcommunitylabdhub.framework.k8s.kubernetes.K8sBuilderHelper;
import it.smartcommunitylabdhub.framework.k8s.kubernetes.K8sSecretHelper;
import it.smartcommunitylabdhub.framework.k8s.objects.CoreLabel;
Expand All @@ -41,6 +47,12 @@
public abstract class K8sBaseFramework<T extends K8sRunnable, K extends KubernetesObject>
implements Framework<T>, InitializingBean {

//custom object mapper with mixIn for IntOrString
protected static final ObjectMapper mapper = JacksonMapper.CUSTOM_OBJECT_MAPPER.addMixIn(
IntOrString.class,
IntOrStringMixin.class
);

protected final CoreV1Api coreV1Api;

protected ApplicationProperties applicationProperties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package it.smartcommunitylabdhub.framework.k8s.infrastructure.k8s;

import com.fasterxml.jackson.core.type.TypeReference;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.BatchV1Api;
Expand All @@ -15,6 +16,8 @@
import it.smartcommunitylabdhub.framework.k8s.runnables.K8sCronJobRunnable;
import it.smartcommunitylabdhub.framework.k8s.runnables.K8sJobRunnable;
import jakarta.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -28,6 +31,9 @@ public class K8sCronJobFramework extends K8sBaseFramework<K8sCronJobRunnable, V1

public static final String FRAMEWORK = "k8scronjob";

private static final TypeReference<HashMap<String, Serializable>> typeRef = new TypeReference<
HashMap<String, Serializable>
>() {};
private final BatchV1Api batchV1Api;

@Autowired
Expand All @@ -54,6 +60,13 @@ public K8sCronJobRunnable run(K8sCronJobRunnable runnable) throws K8sFrameworkEx
// Update runnable state..
runnable.setState(State.RUNNING.name());

//update results
try {
runnable.setResults(Map.of("cronJob", mapper.convertValue(job, typeRef)));
} catch (IllegalArgumentException e) {
log.error("error reading k8s results: {}", e.getMessage());
}

return runnable;
}

Expand All @@ -78,7 +91,7 @@ public K8sCronJobRunnable delete(K8sCronJobRunnable runnable) throws K8sFramewor
return runnable;
}
//secrets
cleanRunSecret(runnable);
cleanRunSecret(runnable);

delete(job);
runnable.setState(State.DELETED.name());
Expand Down
Loading

0 comments on commit 0ce18b9

Please sign in to comment.