Skip to content

Commit

Permalink
feat: add locking to run manager to fix fsm concurrency + add error m…
Browse files Browse the repository at this point in the history
…essages to status
  • Loading branch information
matteo-s committed Sep 19, 2024
1 parent 4e8a4d3 commit 216d5b2
Show file tree
Hide file tree
Showing 16 changed files with 622 additions and 339 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,22 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@RunProcessorType(stages = { "onRunning", "onCompleted", "onError", "onStopped", "onDeleted" }, id = K8sProcessor.ID)
@RunProcessorType(
stages = {
"onBuilt",
"onReady",
"onRunning",
"onCompleted",
"onError",
"onStopping",
"onStopped",
"onDeleting",
"onDeleted",
},
id = K8sProcessor.ID
)
@Component(RunTransitionsProcessor.ID)
@Slf4j
public class RunTransitionsProcessor implements RunProcessor<RunTransitionsSpec> {
Expand Down Expand Up @@ -44,6 +58,7 @@ public RunTransitionsSpec process(Run run, RunRunnable runRunnable, RunBaseStatu
.builder()
.event(null) //TODO add explicit event
.status(status != null ? status.getState() : null)
.message(status != null ? status.getMessage() : null)
.time(OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public Run updateRunById(

@Operation(summary = "Delete a specific run, with optional cascade")
@DeleteMapping(path = "/{id}")
public void deleteRun(
public Run deleteRun(
@PathVariable @Valid @NotNull @Pattern(regexp = Keys.SLUG_PATTERN) String project,
@PathVariable @Valid @NotNull @Pattern(regexp = Keys.SLUG_PATTERN) String id
) throws NoSuchEntityException {
Expand All @@ -159,7 +159,7 @@ public void deleteRun(
}

//delete via manager
runManager.delete(run);
return runManager.delete(run);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static class Transition {

private RunEvent event;
private String status;
private String message;

@JsonFormat(shape = JsonFormat.Shape.STRING)
protected OffsetDateTime time;
Expand Down
6 changes: 6 additions & 0 deletions modules/commons/.flattened-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,11 @@
<version>4.36.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.14.17</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public class RunnableChangedEvent<R extends RunRunnable> {

private RunnableMonitorObject runMonitorObject;
private R runnable;

public String getId() {
return runnable != null ? runnable.getId() : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,24 @@
public class RunBaseStatus extends BaseSpec {

private String state;
private String message;

public RunBaseStatus(String state) {
this.state = state;
}

@Override
public void configure(Map<String, Serializable> data) {
RunBaseStatus meta = mapper.convertValue(data, RunBaseStatus.class);
RunBaseStatus spec = mapper.convertValue(data, RunBaseStatus.class);

this.state = spec.getState();
this.message = spec.getMessage();
}

public static RunBaseStatus with(Map<String, Serializable> data) {
RunBaseStatus spec = new RunBaseStatus();
spec.configure(data);

this.state = meta.getState();
return spec;
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package it.smartcommunitylabdhub.framework.k8s.base;

import it.smartcommunitylabdhub.commons.infrastructure.RunRunnable;
import it.smartcommunitylabdhub.commons.models.base.ExecutableBaseSpec;
import it.smartcommunitylabdhub.commons.models.entities.run.Run;
import it.smartcommunitylabdhub.commons.models.entities.run.RunBaseSpec;
import it.smartcommunitylabdhub.commons.models.entities.run.RunBaseStatus;
import it.smartcommunitylabdhub.commons.runtimes.base.AbstractBaseRuntime;
import it.smartcommunitylabdhub.framework.k8s.kubernetes.K8sBuilderHelper;
import it.smartcommunitylabdhub.framework.k8s.runnables.K8sRunnable;
import jakarta.validation.constraints.NotNull;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class K8sBaseRuntime<
Expand All @@ -23,4 +26,21 @@ protected K8sBaseRuntime(String kind) {
public void setK8sBuilderHelper(K8sBuilderHelper k8sBuilderHelper) {
this.k8sBuilderHelper = k8sBuilderHelper;
}

@SuppressWarnings("unchecked")
@Override
public Z onError(@NotNull Run run, RunRunnable runnable) {
if (runnable != null && runnable instanceof K8sRunnable) {
K8sRunnable k8sRunnable = (K8sRunnable) runnable;
RunBaseStatus status = RunBaseStatus
.baseBuilder()
.state(k8sRunnable.getState())
.message(k8sRunnable.getError())
.build();

return (Z) status;
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public K8sCronJobRunnable refresh(K8sCronJobRunnable runnable) {

if (job == null || job.getStatus() == null) {
// something is missing, no recovery
log.error("Missing or invalid job for {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Job missing or invalid");
}

log.info("Job status: {}", job.getStatus().toString());
Expand Down Expand Up @@ -88,6 +90,7 @@ public K8sCronJobRunnable refresh(K8sCronJobRunnable runnable) {
} catch (K8sFrameworkException e) {
// Set Runnable to ERROR state
runnable.setState(State.ERROR.name());
runnable.setError(e.getClass().getSimpleName() + ":" + String.valueOf(e.getMessage()));
}

return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public K8sDeploymentRunnable refresh(K8sDeploymentRunnable runnable) {
// something is missing, no recovery
log.error("Missing or invalid deployment for {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Deployment missing or invalid");
}

log.debug("deployment status: replicas {}", deployment.getStatus().getReadyReplicas());
Expand Down Expand Up @@ -94,6 +95,7 @@ public K8sDeploymentRunnable refresh(K8sDeploymentRunnable runnable) {
// we observed multiple restarts, stop it
log.error("Multiple restarts observed {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Multiple pod restarts");
}
}

Expand Down Expand Up @@ -147,6 +149,7 @@ public K8sDeploymentRunnable refresh(K8sDeploymentRunnable runnable) {
} catch (K8sFrameworkException e) {
// Set Runnable to ERROR state
runnable.setState(State.ERROR.name());
runnable.setError(e.getClass().getSimpleName() + ":" + String.valueOf(e.getMessage()));
}

return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public K8sJobRunnable refresh(K8sJobRunnable runnable) {
// something is missing, no recovery
log.error("Missing or invalid job for {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Job missing or invalid");
}

if (log.isTraceEnabled()) {
Expand All @@ -54,8 +55,9 @@ public K8sJobRunnable refresh(K8sJobRunnable runnable) {
runnable.setState(State.COMPLETED.name());
} else if (job.getStatus().getFailed() != null && job.getStatus().getFailed().intValue() > 0) {
// Job has failed delete job and pod
log.debug("Job failed succeeded for {}", runnable.getId());
log.debug("Job status failed for {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Job failed: " + job.getStatus().getFailed());
}

//try to fetch pods
Expand Down Expand Up @@ -109,6 +111,7 @@ public K8sJobRunnable refresh(K8sJobRunnable runnable) {
} catch (K8sFrameworkException e) {
// Set Runnable to ERROR state
runnable.setState(State.ERROR.name());
runnable.setError(e.getClass().getSimpleName() + ":" + String.valueOf(e.getMessage()));
}

return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public K8sServeRunnable refresh(K8sServeRunnable runnable) {
// something is missing, no recovery
log.error("Missing or invalid deployment for {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Deployment missing or invalid");
}

log.debug("deployment status: replicas {}", deployment.getStatus().getReadyReplicas());
Expand Down Expand Up @@ -105,6 +106,7 @@ public K8sServeRunnable refresh(K8sServeRunnable runnable) {
// we observed multiple restarts, stop it
log.error("Multiple restarts observed {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Multiple pod restarts");
}
}

Expand Down Expand Up @@ -160,6 +162,7 @@ public K8sServeRunnable refresh(K8sServeRunnable runnable) {
} catch (K8sFrameworkException e) {
// Set Runnable to ERROR state
runnable.setState(State.ERROR.name());
runnable.setError(e.getClass().getSimpleName() + ":" + String.valueOf(e.getMessage()));
}

return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void process(R runnable) {
// Set runnable to error state send event
log.error("Error with k8s for runnable {} {}: {}", clazz.getSimpleName(), runnable.getId(), e.getMessage());
runnable.setState(State.ERROR.name());
runnable.setError(clazz.getSimpleName() + ":" + String.valueOf(e.getMessage()));

try {
log.debug("update runnable {} {} in store", clazz.getSimpleName(), runnable.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class K8sRunnable implements RunRunnable, SecuredRunnable, CredentialsCon

private String state;

private String error;

private Map<String, Serializable> results;

@JsonIgnore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public K8sKanikoRunnable refresh(K8sKanikoRunnable runnable) {

if (job == null || job.getStatus() == null) {
// something is missing, no recovery
log.error("Missing or invalid job for {}", runnable.getId());
runnable.setState(State.ERROR.name());
runnable.setError("Job missing or invalid");
}

log.info("Job status: {}", job.getStatus().toString());
Expand All @@ -50,6 +52,7 @@ public K8sKanikoRunnable refresh(K8sKanikoRunnable runnable) {
} else if (job.getStatus().getFailed() != null && job.getStatus().getFailed().intValue() > 0) {
// Job has failed delete job and pod
runnable.setState(State.ERROR.name());
runnable.setError("Job failed: " + job.getStatus().getFailed());
}

//try to fetch pods
Expand Down Expand Up @@ -91,6 +94,7 @@ public K8sKanikoRunnable refresh(K8sKanikoRunnable runnable) {
} catch (K8sFrameworkException e) {
// Set Runnable to ERROR state
runnable.setState(State.ERROR.name());
runnable.setError(e.getClass().getSimpleName() + ":" + String.valueOf(e.getMessage()));
}

return runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void listen(K8sKanikoRunnable runnable) {
// Set runnable to error state send event
log.error("Error with k8s: {}", e.getMessage());
runnable.setState(State.ERROR.name());
runnable.setError(e.getClass().getSimpleName() + ":" + String.valueOf(e.getMessage()));

try {
runnableStore.store(runnable.getId(), runnable);
Expand Down

0 comments on commit 216d5b2

Please sign in to comment.