Skip to content

Commit

Permalink
fix: api for å hente ut kjørende task. (#63)
Browse files Browse the repository at this point in the history
* track task lineage (hvilken task har opprettet en annen ) - eksperimentelt
  • Loading branch information
frode-carlsen authored Jun 12, 2020
1 parent 6a078ec commit a76b34f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import javax.enterprise.context.ApplicationScoped;
Expand All @@ -27,6 +28,7 @@
import org.hibernate.jpa.QueryHints;
import org.hibernate.query.NativeQuery;
import org.hibernate.type.StringType;
import org.slf4j.MDC;

import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskGruppe;
Expand Down Expand Up @@ -154,6 +156,7 @@ public String lagre(ProsessTaskData task) {
protected Long doLagreTask(ProsessTaskData task) {
ProsessTaskEntitet pte;
if (task.getId() != null) {
trackTaskLineage("latest", task);
ProsessTaskStatus nyStatus = task.getStatus();
pte = entityManager.find(ProsessTaskEntitet.class, task.getId());
ProsessTaskStatus status = pte.getStatus();
Expand All @@ -167,6 +170,8 @@ protected Long doLagreTask(ProsessTaskData task) {
eventPubliserer.fireEvent(pte.tilProsessTask(), status, nyStatus);
}
} else {
trackTaskLineage("parent", task);

pte = new ProsessTaskEntitet();
pte.kopierFraNy(task);
pte.setSubjectProvider(subjectProvider);
Expand Down Expand Up @@ -379,4 +384,15 @@ public EntityManager getEntityManager() {
return entityManager;
}

private static void trackTaskLineage(String keyPrefix, ProsessTaskData task) {
Set<String> lineageProps = Set.of(TaskManager.TASK_ID_PROP, TaskManager.TASK_PROP);
lineageProps.forEach(v -> {
var prop = MDC.get(v);
var key = keyPrefix + "." + v;
if (prop != null && task.getPropertyValue(key) == null) {
task.setProperty(key, prop);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import no.nav.vedtak.apptjeneste.AppServiceHandler;
import no.nav.vedtak.felles.jpa.TransactionHandler;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskDispatcher;

/**
Expand Down Expand Up @@ -102,6 +103,12 @@ public class TaskManager implements AppServiceHandler {
private final AtomicReference<LocalDateTime> pollerRoundNoneFoundSince = new AtomicReference<>(LocalDateTime.now());
private final AtomicReference<LocalDateTime> pollerRoundNoneLastReported = new AtomicReference<>(LocalDateTime.now());

/** trenger ikke ha denne som static siden TaskManager er ApplicationScoped. */
private final ThreadLocal<ProsessTaskData> currentTask = new ThreadLocal<>();

static final String TASK_PROP = "prosess_task";
static final String TASK_ID_PROP = "prosess_task_id";

public TaskManager() {
}

Expand All @@ -111,7 +118,28 @@ public TaskManager(TaskManagerRepositoryImpl taskManagerRepository, @Any Instanc
this.taskManagerRepository = taskManagerRepository;

if (dispatcher != null) {
this.taskDispatcher = selectProsessTaskDispatcher(dispatcher);

/** Holder styr på kjørende task per tråd slik at vi kan i enkelttilfeller hente ut info om det på en tråd. */
class TrackCurrentDispatchedTask implements ProsessTaskDispatcher {
ProsessTaskDispatcher delegate = selectProsessTaskDispatcher(dispatcher);

@Override
public boolean feilhåndterException(String taskType, Throwable e) {
return delegate.feilhåndterException(taskType, e);
}

@Override
public void dispatch(ProsessTaskData task) throws Exception {
try {
currentTask.set(task);
delegate.dispatch(task);
} finally {
currentTask.remove();
}
}
}

this.taskDispatcher = new TrackCurrentDispatchedTask();
}
}

Expand Down Expand Up @@ -186,6 +214,10 @@ public synchronized void stop() {
}
}

public ProsessTaskData getCurrentTask() {
return currentTask.get();
}

synchronized void startPollerThread() {
if (pollingServiceScheduledFutures != null) {
throw new IllegalStateException("Service allerede startet, stopp først");//$NON-NLS-1$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskData;
import no.nav.vedtak.felles.prosesstask.api.ProsessTaskDispatcher;
import no.nav.vedtak.felles.prosesstask.impl.TaskManager.ReadTaskFunksjon;
import no.nav.vedtak.log.mdc.MdcExtendedLogContext;

/** Poller for tilgjengelige tasks og omsetter disse til Runnable som kan kjøres på andre tråder. */
public class TaskManagerGenerateRunnableTasks {
static final Logger log = LoggerFactory.getLogger(TaskManagerGenerateRunnableTasks.class);
static final MdcExtendedLogContext LOG_CONTEXT = MdcExtendedLogContext.getContext("prosess"); //$NON-NLS-1$
static final CDI<Object> CURRENT = CDI.current();

private final BiFunction<Integer, ReadTaskFunksjon, List<IdentRunnable>> availableTasksFunc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void unblockTasks() {
int unvetoed = entityManager.createNativeQuery(sqlUnveto)
.executeUpdate();
if (unvetoed > 0) {
log.warn("Fjernet veto fra {} tasks som var blokkert av andre tasks som allerede er ferdig");
log.warn("Fjernet veto fra {} tasks som var blokkert av andre tasks som allerede er ferdig", unvetoed);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ IdentRunnable lagErrorCallback(final RunTaskInfo taskInfo, final String callId,
return new IdentRunnableTask(taskInfo.getId(), errorCallback, LocalDateTime.now());
}

void clearLogContext() {
static void clearLogContext() {
MDC.clear();
}

void initLogContext(final String callId, String taskName, Long taskId) {
static void initLogContext(final String callId, String taskName, Long taskId) {
if (callId != null) {
MDC.put(CallId.CALL_ID, callId);
} else {
MDC.put(CallId.CALL_ID, CallId.generateCallId());
}
TaskManagerGenerateRunnableTasks.LOG_CONTEXT.add("task", taskName);
TaskManagerGenerateRunnableTasks.LOG_CONTEXT.add("task_id", taskId);
MDC.put(TaskManager.TASK_PROP, taskName);
MDC.put(TaskManager.TASK_ID_PROP, taskId.toString());
}

void handleErrorCallback(IdentRunnable errorCallback) {
Expand Down

0 comments on commit a76b34f

Please sign in to comment.