diff --git a/pom.xml b/pom.xml index 1fe8b196..56822068 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,7 @@ - 2.4.1 + 2.4.2 -SNAPSHOT 11 diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java index 1a57c9bf..3f94ce45 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTask.java @@ -66,7 +66,7 @@ public RunTask(TaskManagerRepositoryImpl taskManagerRepo, this.eventPubliserer = eventPubliserer; this.taskManagerRepository = taskManagerRepo; this.feilhåndteringalgoritmer = feilhåndteringsalgoritmer; - this.vetoHåndterer = new RunTaskVetoHåndterer(eventPubliserer, taskManagerRepo, taskManagerRepo.getEntityManager()); + this.vetoHåndterer = new RunTaskVetoHåndterer(eventPubliserer, taskManagerRepo.getEntityManager()); } public void doRun(RunTaskInfo taskInfo) { diff --git "a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTaskVetoH\303\245ndterer.java" "b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTaskVetoH\303\245ndterer.java" index 95dbf137..70af8759 100644 --- "a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTaskVetoH\303\245ndterer.java" +++ "b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/RunTaskVetoH\303\245ndterer.java" @@ -22,11 +22,8 @@ public class RunTaskVetoHåndterer { private ProsessTaskEventPubliserer eventPubliserer; private EntityManager em; - private TaskManagerRepositoryImpl taskManagerRepo; - - public RunTaskVetoHåndterer(ProsessTaskEventPubliserer eventPubliserer, TaskManagerRepositoryImpl taskManagerRepo, EntityManager entityManager) { + public RunTaskVetoHåndterer(ProsessTaskEventPubliserer eventPubliserer, EntityManager entityManager) { this.eventPubliserer = eventPubliserer; - this.taskManagerRepo = taskManagerRepo; this.em = entityManager; } @@ -75,8 +72,6 @@ boolean vetoRunTask(ProsessTaskEntitet pte) throws IOException { vetoed = true; Long blokkerId = veto.getBlokkertAvProsessTaskId(); - sjekkKanSetteBlokker(blokkerId); - Feil feil = TaskManagerFeil.FACTORY.kanIkkeKjøreFikkVeto(pte.getId(), pte.getTaskName(), blokkerId, veto.getBegrunnelse()); ProsessTaskFeil taskFeil = new ProsessTaskFeil(pte.tilProsessTask(), feil); taskFeil.setBlokkerendeProsessTaskId(blokkerId); @@ -93,20 +88,4 @@ boolean vetoRunTask(ProsessTaskEntitet pte) throws IOException { return vetoed; } - - private void sjekkKanSetteBlokker(Long blokkerId) { - var blokker = taskManagerRepo.finnOgLåsBlokker(blokkerId); - if (blokker.isPresent()) { - // Vi er på riktig vei, kan sette som blokker - return; - } else { - // skal fortsatt finne her (uten lås), (hvis ikke kastes NoResultException, eks hvis noen har slettet) - var blokkerTask = taskManagerRepo.finn(blokkerId); - - // fikk ikke tak i lås, eller så er task er ferdigkjørt. - // Kan ikke se forskjell herfra, så kaster en midlertidig exception som gjør at må prøve på nytt senere. - throw TaskManagerFeil.FACTORY - .kunneIkkeProsessereTaskVetoForsøkerIgjen(blokkerId, blokkerTask.getTaskName(), blokkerTask.getStatus(), blokkerTask.getGruppe()).toException(); - } - } } diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java index f1610022..d4514236 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManager.java @@ -196,7 +196,8 @@ synchronized void startPollerThread() { } this.pollingServiceScheduledFutures = List.of( pollingService.scheduleWithFixedDelay(new PollAvailableTasks(), delayBetweenPollingMillis / 2, delayBetweenPollingMillis, TimeUnit.MILLISECONDS), - pollingService.scheduleWithFixedDelay(new MoveToDonePartition(), 30 * 1000L, 5 * 60 * 1000L, TimeUnit.MILLISECONDS)); + pollingService.scheduleWithFixedDelay(new MoveToDonePartition(), 30 * 1000L, 5 * 60 * 1000L, TimeUnit.MILLISECONDS), + pollingService.scheduleWithFixedDelay(new FreeBlockedTasks(), 750L, 7500L, TimeUnit.MILLISECONDS)); } synchronized void startTaskThreads() { @@ -461,6 +462,46 @@ public void run() { } + /** Unblokkerer tasks som har blitt konservativt blokkert (der den som kjører ikke ser nye veto påga Read Committed tx isolation level. */ + class FreeBlockedTasks implements Runnable { + + /** splittet fra for å kjøre i {@link TransactionHandler}. */ + private final class DoInNewTransaction extends TransactionHandler { + + Integer doWork() throws Exception { + EntityManager entityManager = getTransactionManagerRepository().getEntityManager(); + try { + return super.apply(entityManager); + } finally { + CDI.current().destroy(entityManager); + } + } + + @Override + protected Integer doWork(EntityManager entityManager) throws Exception { + getTransactionManagerRepository().unblockTasks(); + return 0; + } + } + + /** splittet fra {@link #run()} for å kjøre med ActivateRequestContext. */ + public Integer doWithContext() { + try { + return new DoInNewTransaction().doWork(); + } catch (Throwable t) { // NOSONAR + // logg, ikke rethrow feil her da det dreper trådene + log.error("Kunne ikke unblokkerer tasks som kan frigis", t); + } + return 1; + } + + @Override + public void run() { + RequestContextHandler.doWithRequestContext(this::doWithContext); + } + + } + /** Internal executor that also tracks ids of currently queue or running tasks. */ class IdentExecutorService { diff --git a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRepositoryImpl.java b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRepositoryImpl.java index dca90318..6c5b8485 100644 --- a/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRepositoryImpl.java +++ b/task/src/main/java/no/nav/vedtak/felles/prosesstask/impl/TaskManagerRepositoryImpl.java @@ -332,7 +332,6 @@ List tilProsessTask(List resultList) { @Transactional void verifyStartup() { logDatabaseDetaljer(); - unblokkerTasks(); } private void logDatabaseDetaljer() { @@ -366,19 +365,26 @@ private void logDatabaseDetaljer() { result.dbtz, result.dbtid, userTz, hibernateTz, result.inputtid, result.inputtid2, result.drift); } - private void unblokkerTasks() { - var kjørtStatusKoder = KJØRT_STATUSER.stream().map(ProsessTaskStatus::getDbKode).collect(Collectors.toList()); - var ikkeKjørtStatuser = IKKE_KJØRT_STATUSER.stream().map(ProsessTaskStatus::getDbKode).collect(Collectors.toList()); - - // midlertidig fiks dersom noe er veto som ikke bør være lenger - String sqlUnveto = "update prosess_task a set blokkert_av=NULL" - + " WHERE status IN (:statuserIkkeKjoert)" + /** + * Når vi setter veto/blokkert på en task er vi ikke garantert at ikke den som blokkerer ikke er i ferd med å kjøres/snart er ferdig (fordi + * vi opererer med read committed / ikke serialzable rea) + * istdf å ta lås på blokkerende tasks før hvert veto, ettergår vi bare de som har blitt blokkert og opphever veto dersom det ikke er + * nødvendig lenger. + */ + void unblockTasks() { + String sqlUnveto = "update prosess_task a set " + + " status='KLAR'" + + ", blokkert_av=NULL" + + ", feilede_forsoek=0" + + ", siste_kjoering_feil_kode=NULL" + + ", siste_kjoering_feil_tekst=NULL" + + ", neste_kjoering_etter=NULL" + + ", versjon = versjon +1" + + " WHERE status = 'VETO'" + " AND blokkert_av IS NOT NULL" - + " AND EXISTS (select 1 from prosess_task b where b.id=a.blokkert_av AND b.status IN (:statuserKjoert))"; + + " AND EXISTS (select 1 from prosess_task b where b.id=a.blokkert_av AND b.status IN ('KJOERT', 'FERDIG'))"; int unvetoed = entityManager.createNativeQuery(sqlUnveto) - .setParameter("statuserKjoert", kjørtStatusKoder) - .setParameter("statuserIkkeKjoert", ikkeKjørtStatuser) .executeUpdate(); if (unvetoed > 0) { log.warn("Fjernet veto fra {} tasks som var blokkert av andre tasks som allerede er ferdig");